Belle II Software  light-2403-persian
__init__.py
1 #!/usr/bin/env python3
2 
3 
10 
11 """
12 conditions_db
13 -------------
14 
15 Python interface to the ConditionsDB
16 """
17 
18 import os
19 import stat
20 from basf2 import B2FATAL, B2ERROR, B2INFO, B2WARNING, conditions
21 import requests
22 from requests.packages.urllib3.fields import RequestField
23 from requests.packages.urllib3.filepost import encode_multipart_formdata
24 import json
25 import urllib
26 from versioning import upload_global_tag, jira_global_tag_v2
27 from collections import defaultdict
28 from concurrent.futures import ThreadPoolExecutor, wait as futures_wait
29 import hashlib
30 import itertools
31 from typing import Union # noqa
32 import getpass
33 import tempfile
34 from conditions_db.iov import IntervalOfValidity
35 
36 
37 def encode_name(name):
38  """Escape name to be used in an url"""
39  return urllib.parse.quote(name, safe="")
40 
41 
42 def file_checksum(filename):
43  """Calculate md5 hash of file"""
44  md5hash = hashlib.md5()
45  with open(filename, "rb") as data:
46  md5hash.update(data.read())
47  return md5hash.hexdigest()
48 
49 
50 def chunks(container, chunk_size):
51  """Cut a container in chunks of max. chunk_size"""
52  it = iter(container)
53  while True:
54  chunk = tuple(itertools.islice(it, chunk_size))
55  if not chunk:
56  return
57  yield chunk
58 
59 
60 def get_cdb_authentication_token(path=None):
61  """
62  Helper function for correctly retrieving the CDB authentication token (either via file or via issuing server).
63 
64  :param path: Path to a file containing a CDB authentication token; if None, the function will use
65  a default path (``${HOME}/b2cdb_${BELLE2_USER}.token`` or ``$TMPFILE/b2cdb_${BELLE2_USER}.token``)
66  to look for a token.
67  """
68  # if we pass a path, let's use it for getting the token, otherwise use the default one
69  if path:
70  path_to_token = path
71  else:
72  path_to_token = os.path.join(os.getenv('HOME', tempfile.gettempdir()), f'b2cdb_{os.getenv("BELLE2_USER", None)}.token')
73 
74  # check validity of existing token
75  if os.path.isfile(path_to_token):
76  with open(path_to_token) as token_file:
77  response = requests.get('https://token.belle2.org/check', verify=False, params={'token': token_file.read().strip()})
78  if response.status_code == 400:
79  B2INFO(f'The file {path_to_token} contains an invalid token, getting a new token...')
80  os.unlink(path_to_token)
81  elif response.status_code > 400:
82  B2WARNING("The validity of the existing token could not be checked. Trying to connect to CDB anyway.")
83 
84  # request a token if there is none
85  if not os.path.isfile(path_to_token):
86  username = os.environ['BELLE2_USER']
87  othername = input(f'\nConfirm your DESY user name by pressing enter or type the correct one [{username}]: ')
88  if othername != '':
89  username = othername
90  password = getpass.getpass("Please type your DESY password to request a CDB token: ")
91  response = requests.get('https://token.belle2.org', verify=False, auth=(username, password))
92  print(response.content)
93  if response.status_code == requests.codes.ok:
94  with open(path_to_token, 'wb') as token_file:
95  os.chmod(path_to_token, stat.S_IRUSR | stat.S_IWUSR)
96  token_file.write(response.content)
97  else:
98  B2ERROR('Failed to get token')
99  return None
100 
101  # read and return token
102  with open(path_to_token) as token_file:
103  return token_file.read().strip()
104 
105 
106 def set_cdb_authentication_token(cdb_instance, auth_token=None):
107  """
108  Helper function for setting the CDB authentication token.
109 
110  :param cdb_instance: An instance of the ``ConditionsDB`` class.
111  :param auth_token: A CDB authentication token: if ``None``, it is automatically retrieved from the issuing server
112  and then set
113  """
114  if auth_token is not None:
115  cdb_instance.set_authentication_token(auth_token)
116  else:
117  # If something goes wrong with the auth. token, the function returns None and the authentication will fail
118  token = get_cdb_authentication_token(os.getenv('BELLE2_CDB_AUTH_TOKEN', default=None))
119  cdb_instance.set_authentication_token(token)
120 
121 
122 class BearerAuth(requests.auth.AuthBase):
123  """Simple class to present bearer token instead of username/password"""
124 
125  def __init__(self, token):
126  """Construct from a token"""
127 
128  self._authtoken_authtoken = f"Bearer {token}"
129 
130  def __call__(self, r):
131  """Update headers to include token"""
132  r.headers["X-Authorization"] = self._authtoken_authtoken
133  return r
134 
135 
137  """Small container class to help compare payload information for efficient
138  comparison between globaltags"""
139 
140  @classmethod
141  def from_json(cls, payload, iov=None):
142  """Set all internal members from the json information of the payload and the iov.
143 
144  Arguments:
145  payload (dict): json information of the payload as returned by REST api
146  iov (dict): json information of the iov as returned by REST api
147  """
148  if iov is None:
149  iov = {"payloadIovId": None, "expStart": None, "runStart": None, "expEnd": None, "runEnd": None}
150 
151  return cls(
152  payload['payloadId'],
153  payload['basf2Module']['name'],
154  payload['revision'],
155  payload['checksum'],
156  payload['payloadUrl'],
157  payload['baseUrl'],
158  iov['payloadIovId'],
159  (iov["expStart"], iov["runStart"], iov["expEnd"], iov["runEnd"]),
160  )
161 
162  def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None):
163  """
164  Create a new object from the given information
165  """
166 
167  self.namename = name
168 
169  self.checksumchecksum = checksum
170 
171  self.ioviov = iov
172 
173  self.revisionrevision = revision
174 
175  self.payload_idpayload_id = payload_id
176 
177  self.iov_idiov_id = iov_id
178 
179  self.base_urlbase_url = base_url
180 
181  self.payload_urlpayload_url = payload_url
182 
183  @property
184  def url(self):
185  """Return the full url to the payload on the server"""
186  return urllib.parse.urljoin(self.base_urlbase_url + '/', self.payload_urlpayload_url)
187 
188  def __hash__(self):
189  """Make object hashable"""
190  return hash((self.namename, self.checksumchecksum, self.ioviov))
191 
192  def __eq__(self, other):
193  """Check if two payloads are equal"""
194  return (self.namename, self.checksumchecksum, self.ioviov) == (other.name, other.checksum, other.iov)
195 
196  def __lt__(self, other):
197  """Sort payloads by name, iov, revision"""
198  return (self.namename.lower(), self.ioviov, self.revisionrevision) < (other.name.lower(), other.iov, other.revision)
199 
200  def readable_iov(self):
201  """return a human readable name for the IoV"""
202  if self.ioviov is None:
203  return "none"
204 
205  if self.ioviov == (0, 0, -1, -1):
206  return "always"
207 
208  e1, r1, e2, r2 = self.ioviov
209  if e1 == e2:
210  if r1 == 0 and r2 == -1:
211  return f"exp {e1}"
212  elif r2 == -1:
213  return f"exp {e1}, runs {r1}+"
214  elif r1 == r2:
215  return f"exp {e1}, run {r1}"
216  else:
217  return f"exp {e1}, runs {r1} - {r2}"
218  else:
219  if e2 == -1 and r1 == 0:
220  return f"exp {e1} - forever"
221  elif e2 == -1:
222  return f"exp {e1}, run {r1} - forever"
223  elif r1 == 0 and r2 == -1:
224  return f"exp {e1}-{e2}, all runs"
225  elif r2 == -1:
226  return f"exp {e1}, run {r1} - exp {e2}, all runs"
227  else:
228  return f"exp {e1}, run {r1} - exp {e2}, run {r2}"
229 
230 
232  """Class to interface conditions db REST interface"""
233 
234 
235  BASE_URLS = [conditions.default_metadata_provider_url]
236 
237  class RequestError(RuntimeError):
238  """Class to be thrown by request() if there is any error"""
239  pass
240 
241  @staticmethod
242  def get_base_urls(given_url):
243  """Resolve the list of server urls. If a url is given just return it.
244  Otherwise return servers listed in BELLE2_CONDB_SERVERLIST or the
245  builtin defaults
246 
247  Arguments:
248  given_url (str): Explicit base_url. If this is not None it will be
249  returned as is in a list of length 1
250 
251  Returns:
252  a list of urls to try for database connectivity
253  """
254 
255  base_url_list = ConditionsDB.BASE_URLS[:]
256  base_url_env = os.environ.get("BELLE2_CONDB_SERVERLIST", None)
257  if given_url is not None:
258  base_url_list = [given_url]
259  elif base_url_env is not None:
260  base_url_list = base_url_env.split()
261  B2INFO("Getting Conditions Database servers from Environment:")
262  for i, url in enumerate(base_url_list, 1):
263  B2INFO(f" {i}. {url}")
264  # try to escalate to https for all given urls
265  full_list = []
266  for url in base_url_list:
267  if url.startswith("http://"):
268  full_list.append("https" + url[4:])
269  # but keep the http in case of connection problems
270  full_list.append(url)
271  return full_list
272 
273  def __init__(self, base_url=None, max_connections=10, retries=3):
274  """
275  Create a new instance of the interface
276 
277  Args:
278  base_url (string): base url of the rest interface
279  max_connections (int): number of connections to keep open, mostly useful for threaded applications
280  retries (int): number of retries in case of connection problems
281  """
282 
283 
284  self._session_session = requests.Session()
285  # and set the connection options we want to have
286  adapter = requests.adapters.HTTPAdapter(
287  pool_connections=max_connections, pool_maxsize=max_connections,
288  max_retries=retries, pool_block=True
289  )
290  self._session_session.mount("http://", adapter)
291  self._session_session.mount("https://", adapter)
292  # and also set the proxy settings
293  if "BELLE2_CONDB_PROXY" in os.environ:
294  self._session_session.proxies = {
295  "http": os.environ.get("BELLE2_CONDB_PROXY"),
296  "https": os.environ.get("BELLE2_CONDB_PROXY"),
297  }
298  # test the given url or try the known defaults
299  base_url_list = ConditionsDB.get_base_urls(base_url)
300 
301  for url in base_url_list:
302 
303  self._base_url_base_url = url.rstrip("/") + "/"
304  try:
305  req = self._session_session.request("HEAD", self._base_url_base_url + "v2/globalTags")
306  req.raise_for_status()
307  except requests.RequestException as e:
308  B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
309  else:
310  break
311  else:
312  B2FATAL("No working database servers configured, giving up")
313 
314  # We have a working server so change the api to return json instead of
315  # xml, much easier in python, also request non-cached replies. We do
316  # this now because for the server check above we're fine with cached
317  # results.
318  self._session_session.headers.update({"Accept": "application/json", "Cache-Control": "no-cache"})
319 
320  def set_authentication_token(self, token):
321  """
322  Set authentication token when talking to the database
323 
324  Args:
325  token (str): JWT to hand to the database. Will not be checked
326  for validity here.
327  """
328  self._session_session.auth = BearerAuth(token)
329 
330  def request(self, method, url, message=None, *args, **argk):
331  """
332  Request function, similar to requests.request but adding the base_url
333 
334  Args:
335  method (str): GET, POST, etc.
336  url (str): url for the request, base_url will be prepended
337  message (str): message to show when starting the request and if it fails
338 
339  All other arguments will be forwarded to requests.request.
340  """
341  if message is not None:
342  B2INFO(message)
343 
344  try:
345  req = self._session_session.request(method, self._base_url_base_url + "v2/" + url.lstrip("/"), *args, **argk)
346  except requests.exceptions.ConnectionError as e:
347  B2FATAL("Could not access '" + self._base_url_base_url + url.lstrip("/") + "': " + str(e))
348 
349  if req.status_code >= 300:
350  # Apparently something is not good. Let's try to decode the json
351  # reply containing reason and message
352  try:
353  response = req.json()
354  message = response.get("message", "")
355  colon = ": " if message.strip() else ""
356  error = f"Request {method} {url} returned {response['code']} {response['reason']}{colon}{message}"
357  except json.JSONDecodeError:
358  # seems the reply was not even json
359  error = f"Request {method} {url} returned non JSON response {req.status_code}: {req.content}"
360 
361  if message is not None:
362  raise ConditionsDB.RequestError(f"{message} failed: {error}")
363  else:
364  raise ConditionsDB.RequestError(error)
365 
366  if method != "HEAD" and req.status_code != requests.codes.no_content:
367  try:
368  req.json()
369  except json.JSONDecodeError as e:
370  B2INFO(f"Invalid response: {req.content}")
371  raise ConditionsDB.RequestError(f"{method} {url} returned invalid JSON response {e}")
372  return req
373 
374  def get_globalTags(self):
375  """Get a list of all globaltags. Returns a dictionary with the globaltag
376  names and the corresponding ids in the database"""
377 
378  try:
379  req = self.requestrequest("GET", "/globalTags")
380  except ConditionsDB.RequestError as e:
381  B2ERROR(f"Could not get the list of globaltags: {e}")
382  return None
383 
384  result = {}
385  for tag in req.json():
386  result[tag["name"]] = tag
387 
388  return result
389 
390  def has_globalTag(self, name):
391  """Check whether the globaltag with the given name exists."""
392 
393  try:
394  self.requestrequest("GET", f"/globalTag/{encode_name(name)}")
396  return False
397 
398  return True
399 
400  def get_globalTagInfo(self, name):
401  """Get the id of the globaltag with the given name. Returns either the
402  id or None if the tag was not found"""
403 
404  try:
405  req = self.requestrequest("GET", f"/globalTag/{encode_name(name)}")
406  except ConditionsDB.RequestError as e:
407  B2ERROR(f"Cannot find globaltag '{name}': {e}")
408  return None
409 
410  return req.json()
411 
412  def get_globalTagType(self, name):
413  """
414  Get the dictionary describing the given globaltag type (currently
415  one of DEV or RELEASE). Returns None if tag type was not found.
416  """
417  try:
418  req = self.requestrequest("GET", "/globalTagType")
419  except ConditionsDB.RequestError as e:
420  B2ERROR(f"Could not get list of valid globaltag types: {e}")
421  return None
422 
423  types = {e["name"]: e for e in req.json()}
424 
425  if name in types:
426  return types[name]
427 
428  B2ERROR(f"Unknown globaltag type: '{name}', please use one of {', '.join(types)}")
429  return None
430 
431  def create_globalTag(self, name, description, user):
432  """
433  Create a new globaltag
434  """
435  info = {"name": name, "description": description, "modifiedBy": user, "isDefault": False}
436  try:
437  req = self.requestrequest("POST", "/globalTag/DEV", f"Creating globaltag {name}", json=info)
438  except ConditionsDB.RequestError as e:
439  B2ERROR(f"Could not create globaltag {name}: {e}")
440  return None
441 
442  return req.json()
443 
444  def get_all_iovs(self, globalTag, exp=None, run=None, message=None, run_range=None, fully_contained=False):
445  """
446  Return list of all payloads in the given globaltag where each element is
447  a `PayloadInformation` instance
448 
449  Parameters:
450  gobalTag (str): name of the globaltag
451  exp (int): if given limit the list of payloads to the ones valid for
452  the given exp,run combination
453  run (int): if given limit the list of payloads to the ones valid for
454  the given exp,run combination
455  message (str): additional message to show when downloading the
456  payload information. Will be directly appended to
457  "Obtaining lists of iovs for globaltag {globalTag}"
458  run_range (tuple): if given limit the list of payloads to the ones
459  overlapping with the given run range, if
460  fully_contained (bool): if True and the run_range is not None it limits
461  the list of payloads to the ones fully contained in the given run range
462 
463  Warning:
464  Both, exp and run, need to be given at the same time. Just supplying
465  an experiment or a run number will not work
466  """
467  globalTag = encode_name(globalTag)
468  if message is None:
469  message = ""
470  if run_range is not None:
471  if fully_contained:
472  message += f" [fully contained in {tuple(run_range)}]"
473  else:
474  message += f" [valid in {tuple(run_range)}]"
475  run_range = IntervalOfValidity(run_range)
476 
477  if exp is not None:
478  msg = f"Obtaining list of iovs for globaltag {globalTag}, exp={exp}, run={run}{message}"
479  req = self.requestrequest("GET", "/iovPayloads", msg, params={'gtName': globalTag, 'expNumber': exp, 'runNumber': run})
480  else:
481  msg = f"Obtaining list of iovs for globaltag {globalTag}{message}"
482  req = self.requestrequest("GET", f"/globalTag/{globalTag}/globalTagPayloads", msg)
483  all_iovs = []
484  for item in req.json():
485  payload = item["payload" if 'payload' in item else "payloadId"]
486  if "payloadIov" in item:
487  iovs = [item['payloadIov']]
488  else:
489  iovs = item['payloadIovs']
490 
491  for iov in iovs:
492  if run_range is not None:
493  iov_ = IntervalOfValidity(iov['expStart'], iov['runStart'], iov['expEnd'], iov['runEnd'])
494  if fully_contained:
495  if not iov_ & run_range == iov_:
496  continue
497  else:
498  if iov_ & run_range is None:
499  continue
500  all_iovs.append(PayloadInformation.from_json(payload, iov))
501 
502  all_iovs.sort()
503  return all_iovs
504 
505  def get_payloads(self, global_tag=None):
506  """
507  Get a list of all defined payloads (for the given global_tag or by default for all).
508  Returns a dictionary which maps (module, checksum) to the payload id.
509  """
510 
511  try:
512  if global_tag:
513  req = self.requestrequest("GET", f"/globalTag/{encode_name(global_tag)}/payloads")
514  else:
515  req = self.requestrequest("GET", "/payloads")
516  except ConditionsDB.RequestError as e:
517  B2ERROR(f"Cannot get list of payloads: {e}")
518  return {}
519 
520  result = {}
521  for payload in req.json():
522  module = payload["basf2Module"]["name"]
523  checksum = payload["checksum"]
524  result[(module, checksum)] = payload["payloadId"]
525 
526  return result
527 
528  def check_payloads(self, payloads, information="payloadId"):
529  """
530  Check for the existence of payloads in the database.
531 
532  Arguments:
533  payloads (list((str,str))): A list of payloads to check for. Each
534  payload needs to be a tuple of the name of the payload and the
535  md5 checksum of the payload file.
536  information (str): The information to be extracted from the
537  payload dictionary
538 
539  Returns:
540  A dictionary with the payload identifiers (name, checksum) as keys
541  and the requested information as values for all payloads which are already
542  present in the database.
543  """
544 
545  search_query = [{"name": e[0], "checksum": e[1]} for e in payloads]
546  try:
547  req = self.requestrequest("POST", "/checkPayloads", json=search_query)
548  except ConditionsDB.RequestError as e:
549  B2ERROR(f"Cannot check for existing payloads: {e}")
550  return {}
551 
552  result = {}
553  for payload in req.json():
554  module = payload["basf2Module"]["name"]
555  checksum = payload["checksum"]
556  result[(module, checksum)] = payload[information]
557 
558  return result
559 
560  def get_revisions(self, entries):
561  """
562  Get the revision numbers of payloads in the database.
563 
564  Arguments:
565  entries (list): A list of payload entries.
566  Each entry must have the attributes module and checksum.
567 
568  Returns:
569  True if successful.
570  """
571 
572  result = self.check_payloadscheck_payloads([(entry.module, entry.checksum) for entry in entries], "revision")
573  if not result:
574  return False
575 
576  for entry in entries:
577  entry.revision = result.get((entry.module, entry.checksum), 0)
578 
579  return True
580 
581  def create_payload(self, module, filename, checksum=None):
582  """
583  Create a new payload
584 
585  Args:
586  module (str): name of the module
587  filename (str): name of the file
588  checksum (str): md5 hexdigest of the file. Will be calculated automatically if not given
589  """
590  if checksum is None:
591  checksum = file_checksum(filename)
592 
593  # this is the only complicated request as we have to provide a
594  # multipart/mixed request which is not directly provided by the request
595  # library.
596  files = [
597  (filename, open(filename, "rb").read(), "application/x-root"),
598  ("json", json.dumps({"checksum": checksum, "isDefault": False}), "application/json"),
599  ]
600  # ok we have the two "files" we want to send, create multipart/mixed
601  # body
602  fields = []
603  for name, contents, mimetype in files:
604  rf = RequestField(name=name, data=contents)
605  rf.make_multipart(content_type=mimetype)
606  fields.append(rf)
607 
608  post_body, content_type = encode_multipart_formdata(fields)
609  content_type = ''.join(('multipart/mixed',) + content_type.partition(';')[1:])
610  headers = {'Content-Type': content_type}
611 
612  # now make the request. Note to self: if multipart/form-data would be
613  # accepted this would be so much nicer here. but it works.
614  try:
615  req = self.requestrequest("POST", f"/package/dbstore/module/{encode_name(module)}/payload", data=post_body, headers=headers)
616  except ConditionsDB.RequestError as e:
617  B2ERROR(f"Could not create Payload: {e}")
618  return None
619 
620  return req.json()["payloadId"]
621 
622  def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun):
623  """
624  Create an iov.
625 
626  Args:
627  globalTagId (int): id of the globaltag, obtain with get_globalTagId()
628  payloadId (int): id of the payload, obtain from create_payload() or get_payloads()
629  firstExp (int): first experiment for which this iov is valid
630  firstRun (int): first run for which this iov is valid
631  finalExp (int): final experiment for which this iov is valid
632  finalRun (int): final run for which this iov is valid
633 
634  Returns:
635  payloadIovId of the created iov, None if creation was not successful
636  """
637  try:
638  # try to convert all arguments except self to integers to make sure they are
639  # valid.
640  local_variables = locals()
641  variables = {e: int(local_variables[e]) for e in
642  ["globalTagId", "payloadId", "firstExp", "firstRun", "finalExp", "finalRun"]}
643  except ValueError:
644  B2ERROR("create_iov: All parameters need to be integers")
645  return None
646 
647  # try to create the iov
648  try:
649  req = self.requestrequest("POST", "/globalTagPayload/{globalTagId},{payloadId}"
650  "/payloadIov/{firstExp},{firstRun},{finalExp},{finalRun}".format(**variables))
651  except ConditionsDB.RequestError as e:
652  B2ERROR(f"Could not create IOV: {e}")
653  return None
654 
655  return req.json()["payloadIovId"]
656 
657  def delete_iov(self, iovId):
658  """Delete an iov
659 
660  Args:
661  iovId (int): id of the iov to be deleted
662  """
663  try:
664  self.requestrequest("DELETE", f"/payloadIov/{iovId}")
665  except ConditionsDB.RequestError as e:
666  B2ERROR(f"Could not delete IOV: {e}")
667 
668  def modify_iov(self, iovId, firstExp, firstRun, finalExp, finalRun):
669  """Modify the validity range of a given iov
670 
671  Args:
672  iovId (int): id of the iov to be modified
673  firstExp (int): first experiment for which this iov is valid
674  firstRun (int): first run for which this iov is valid
675  finalExp (int): final experiment for which this iov is valid
676  finalRun (int): final run for which this iov is valid
677  """
678  try:
679  querystring = {"expStart": str(firstExp), "runStart": str(firstRun), "expEnd": str(finalExp), "runEnd": str(finalRun)}
680  self.requestrequest("PUT", f"/payloadIov/{iovId}", params=querystring)
681  except ConditionsDB.RequestError as e:
682  B2ERROR(f"Could not modify IOV: {e}")
683 
684  def get_iovs(self, globalTagName, payloadName=None):
685  """Return existing iovs for a given tag name. It returns a dictionary
686  which maps (payloadId, first runId, final runId) to iovId
687 
688  Parameters:
689  globalTagName(str): Global tag name.
690  payloadName(str): Payload name (if None, selection by name is
691  not performed.
692  """
693 
694  try:
695  req = self.requestrequest("GET", f"/globalTag/{encode_name(globalTagName)}/globalTagPayloads")
697  # there could be just no iovs so no error
698  return {}
699 
700  result = {}
701  for payload in req.json():
702  payloadId = payload["payloadId"]["payloadId"]
703  if payloadName is not None:
704  if payload["payloadId"]["basf2Module"]["name"] != payloadName:
705  continue
706  for iov in payload["payloadIovs"]:
707  iovId = iov["payloadIovId"]
708  firstExp, firstRun = iov["expStart"], iov["runStart"]
709  finalExp, finalRun = iov["expEnd"], iov["runEnd"]
710  result[(payloadId, firstExp, firstRun, finalExp, finalRun)] = iovId
711 
712  return result
713 
714  def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None):
715  """
716  Upload a testing payload storage to the conditions database.
717 
718  Parameters:
719  filename (str): filename of the testing payload storage file that should be uploaded
720  global_tage (str): name of the globaltag to which the data should be uploaded
721  normalize (Union[bool, str]): if True the payload root files will be normalized to have the same checksum for the
722  same content, if normalize is a string in addition the file name in the root file metadata will be set to it
723  ignore_existing (bool): if True do not upload payloads that already exist
724  nprocess (int): maximal number of parallel uploads
725  uploaded_entries (list): the list of successfully uploaded entries
726 
727  Returns:
728  True if the upload was successful
729  """
730 
731  # first create a list of payloads
732  from conditions_db.testing_payloads import parse_testing_payloads_file
733  B2INFO(f"Reading payload list from {filename}")
734  entries = parse_testing_payloads_file(filename)
735  if entries is None:
736  B2ERROR(f"Problems with testing payload storage file {filename}, exiting")
737  return False
738 
739  if not entries:
740  B2INFO(f"No payloads found in {filename}, exiting")
741  return True
742 
743  B2INFO(f"Found {len(entries)} iovs to upload")
744 
745  # time to get the id for the globaltag
746  tagId = self.get_globalTagInfoget_globalTagInfo(global_tag)
747  if tagId is None:
748  return False
749  tagId = tagId["globalTagId"]
750 
751  # now we could have more than one payload with the same iov so let's go over
752  # it again and remove duplicates but keep the last one for each
753  entries = sorted(set(reversed(entries)))
754 
755  if normalize:
756  name = normalize if normalize is not True else None
757  for e in entries:
758  e.normalize(name=name)
759 
760  # so let's have a list of all payloads (name, checksum) as some payloads
761  # might have multiple iovs. Each payload gets a list of all of those
762  payloads = defaultdict(list)
763  for e in entries:
764  payloads[(e.module, e.checksum)].append(e)
765 
766  existing_payloads = {}
767  existing_iovs = {}
768 
769  def upload_payload(item):
770  """Upload a payload file if necessary but first check list of existing payloads"""
771  key, entries = item
772  if key in existing_payloads:
773  B2INFO(f"{key[0]} (md5:{key[1]}) already existing in database, skipping.")
774  payload_id = existing_payloads[key]
775  else:
776  entry = entries[0]
777  payload_id = self.create_payloadcreate_payload(entry.module, entry.filename, entry.checksum)
778  if payload_id is None:
779  return False
780 
781  B2INFO(f"Created new payload {payload_id} for {entry.module} (md5:{entry.checksum})")
782 
783  for entry in entries:
784  entry.payload = payload_id
785 
786  return True
787 
788  def create_iov(entry):
789  """Create an iov if necessary but first check the list of existing iovs"""
790  if entry.payload is None:
791  return None
792 
793  iov_key = (entry.payload,) + entry.iov_tuple
794  if iov_key in existing_iovs:
795  entry.iov = existing_iovs[iov_key]
796  B2INFO(f"IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum}) already existing in database, skipping.")
797  else:
798  entry.payloadIovId = self.create_iovcreate_iov(tagId, entry.payload, *entry.iov_tuple)
799  if entry.payloadIovId is None:
800  return False
801 
802  B2INFO(f"Created IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum})")
803 
804  return entry
805 
806  # multithreading for the win ...
807  with ThreadPoolExecutor(max_workers=nprocess) as pool:
808  # if we want to check for existing payloads/iovs we schedule the download of
809  # the full payload list. And write a message as each completes
810  if not ignore_existing:
811  B2INFO("Downloading information about existing payloads and iovs...")
812  futures = []
813  existing_iovs = {}
814  existing_payloads = {}
815 
816  def create_future(iter, func, callback=None):
817  fn = pool.submit(iter, func)
818  if callback is not None:
819  fn.add_done_callback(callback)
820  futures.append(fn)
821 
822  def update_iovs(iovs):
823  existing_iovs.update(iovs.result())
824  B2INFO(f"Found {len(existing_iovs)} existing iovs in {global_tag}")
825 
826  def update_payloads(payloads):
827  existing_payloads.update(payloads.result())
828  B2INFO(f"Found {len(existing_payloads)} existing payloads")
829 
830  create_future(self.get_iovsget_iovs, global_tag, update_iovs)
831  # checking existing payloads should not be done with too many at once
832  for chunk in chunks(payloads.keys(), 1000):
833  create_future(self.check_payloadscheck_payloads, chunk, update_payloads)
834 
835  futures_wait(futures)
836 
837  # upload payloads
838  failed_payloads = sum(0 if result else 1 for result in pool.map(upload_payload, payloads.items()))
839  if failed_payloads > 0:
840  B2ERROR(f"{failed_payloads} payloads could not be uploaded")
841 
842  # create IoVs
843  failed_iovs = 0
844  for entry in pool.map(create_iov, entries):
845  if entry:
846  if uploaded_entries is not None:
847  uploaded_entries.append(entry)
848  else:
849  failed_iovs += 1
850  if failed_iovs > 0:
851  B2ERROR(f"{failed_iovs} IoVs could not be created")
852 
853  # update revision numbers
854  if uploaded_entries is not None:
855  self.get_revisionsget_revisions(uploaded_entries)
856 
857  return failed_payloads + failed_iovs == 0
858 
859  def staging_request(self, filename, normalize, data, password):
860  """
861  Upload a testing payload storage to a staging globaltag and create or update a jira issue
862 
863  Parameters:
864  filename (str): filename of the testing payload storage file that should be uploaded
865  normalize (Union[bool, str]): if True the payload root files will be
866  normalized to have the same checksum for the same content, if
867  normalize is a string in addition the file name in the root file
868  metadata will be set to it
869  data (dict): a dictionary with the information provided by the user:
870 
871  * task: category of globaltag, either main, online, prompt, data, mc, or analysis
872  * tag: the globaltag name
873  * request: type of request, either Update, New, or Modification. The latter two imply task == main because
874  if new payload classes are introduced or payload classes are modified then they will first be included in
875  the main globaltag. Here a synchronization of code and payload changes has to be managed.
876  If new or modified payload classes should be included in other globaltags they must already be in a release.
877  * pull-request: number of the pull request containing new or modified payload classes,
878  only for request == New or Modified
879  * backward-compatibility: description of what happens if the old payload is encountered by the updated code,
880  only for request == Modified
881  * forward-compatibility: description of what happens if a new payload is encountered by the existing code,
882  only for request == Modified
883  * release: the required release version
884  * reason: the reason for the request
885  * description: a detailed description for the globaltag manager
886  * issue: identifier of an existing jira issue (optional)
887  * user: name of the user
888  * time: time stamp of the request
889 
890  password: the password for access to jira or the access token and secret for oauth access
891 
892  Returns:
893  True if the upload and jira issue creation/upload was successful
894  """
895 
896  # determine the staging globaltag name
897  data['tag'] = upload_global_tag(data['task'])
898  if data['tag'] is None:
899  data['tag'] = f"temp_{data['task']}_{data['user']}_{data['time']}"
900 
901  # create the staging globaltag if it does not exists yet
902  if not self.has_globalTaghas_globalTag(data['tag']):
903  if not self.create_globalTagcreate_globalTag(data['tag'], data['reason'], data['user']):
904  return False
905 
906  # upload the payloads
907  B2INFO(f"Uploading testing database {filename} to globaltag {data['tag']}")
908  entries = []
909  if not self.uploadupload(filename, data['tag'], normalize, uploaded_entries=entries):
910  return False
911 
912  # get the dictionary for the jira issue creation/update
913  if data['issue']:
914  issue = data['issue']
915  else:
916  issue = jira_global_tag_v2(data['task'])
917  if issue is None:
918  issue = {"components": [{"name": "globaltag"}]}
919 
920  # create jira issue text from provided information
921  if type(issue) is tuple:
922  description = issue[1].format(**data)
923  issue = issue[0]
924  else:
925  description = f"""
926 |*Upload globaltag* | {data['tag']} |
927 |*Request reason* | {data['reason']} |
928 |*Required release* | {data['release']} |
929 |*Type of request* | {data['request']} |
930 """
931  if 'pull-request' in data.keys():
932  description += f"|*Pull request* | \\#{data['pull-request']} |\n"
933  if 'backward-compatibility' in data.keys():
934  description += f"|*Backward compatibility* | \\#{data['backward-compatibility']} |\n"
935  if 'forward-compatibility' in data.keys():
936  description += f"|*Forward compatibility* | \\#{data['forward-compatibility']} |\n"
937  description += '|*Details* |' + ''.join(data['details']) + ' |\n'
938  if data['task'] == 'online':
939  description += '|*Impact on data taking*|' + ''.join(data['data_taking']) + ' |\n'
940 
941  # add information about uploaded payloads/IoVs
942  description += '\nPayloads\n||Name||Revision||IoV||\n'
943  for entry in entries:
944  description += f"|{entry.module} | {entry.revision} | ({entry.iov_str()}) |\n"
945 
946  # create a new issue
947  if type(issue) is dict:
948  issue["description"] = description
949  if "summary" in issue.keys():
950  issue["summary"] = issue["summary"].format(**data)
951  else:
952  issue["summary"] = f"Globaltag request for {data['task']} by {data['user']} at {data['time']}"
953  if "project" not in issue.keys():
954  issue["project"] = {"key": "BII"}
955  if "issuetype" not in issue.keys():
956  issue["issuetype"] = {"name": "Task"}
957  if data["task"] == "main":
958  issue["labels"] = ["TUPPR"]
959 
960  B2INFO(f"Creating jira issue for {data['task']} globaltag request")
961  if isinstance(password, str):
962  response = requests.post('https://agira.desy.de/rest/api/latest/issue', auth=(data['user'], password),
963  json={'fields': issue})
964  else:
965  fields = {'issue': json.dumps(issue)}
966  if 'user' in data.keys():
967  fields['user'] = data['user']
968  if password:
969  fields['token'] = password[0]
970  fields['secret'] = password[1]
971  response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
972  if response.status_code in range(200, 210):
973  B2INFO(f"Issue successfully created: https://agira.desy.de/browse/{response.json()['key']}")
974  else:
975  B2ERROR('The creation of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
976  return False
977 
978  # comment on an existing issue
979  else:
980  # Let's make sure all assignees of new issues are added as watchers
981  # in that case, otherwise they might never find out
982  new_issue_config = jira_global_tag_v2(data['task'])
983  if isinstance(new_issue_config, dict) and "assignee" in new_issue_config:
984  user = new_issue_config['assignee'].get('name', None)
985  if user is not None and isinstance(password, str):
986  response = requests.post(f'https://agira.desy.de/rest/api/latest/issue/{issue}/watchers',
987  auth=(data['user'], password), json=user)
988  if response.status_code in range(200, 210):
989  B2INFO(f"Added {user} as watcher to {issue}")
990  else:
991  B2WARNING(f"Could not add {user} as watcher to {issue}: {response.status_code}")
992 
993  B2INFO(f"Commenting on jira issue {issue} for {data['task']} globaltag request")
994  if isinstance(password, str):
995  response = requests.post(f'https://agira.desy.de/rest/api/latest/issue/{issue}/comment',
996  auth=(data['user'], password), json={'body': description})
997  else:
998  fields = {'id': issue, 'user': user, 'comment': description}
999  if password:
1000  fields['token'] = password[0]
1001  fields['secret'] = password[1]
1002  response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
1003  if response.status_code in range(200, 210):
1004  B2INFO(f"Issue successfully updated: https://agira.desy.de/browse/{issue}")
1005  else:
1006  B2ERROR('The commenting of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
1007  return False
1008 
1009  return True
1010 
1011 
1012 def require_database_for_test(timeout=60, base_url=None):
1013  """Make sure that the database is available and skip the test if not.
1014 
1015  This function should be called in test scripts if they are expected to fail
1016  if the database is down. It either returns when the database is ok or it
1017  will signal test_basf2 that the test should be skipped and exit
1018  """
1019  import sys
1020  if os.environ.get("BELLE2_CONDB_GLOBALTAG", None) == "":
1021  raise Exception("Access to the Database is disabled")
1022  base_url_list = ConditionsDB.get_base_urls(base_url)
1023  for url in base_url_list:
1024  try:
1025  req = requests.request("HEAD", url.rstrip('/') + "/v2/globalTags")
1026  req.raise_for_status()
1027  except requests.RequestException as e:
1028  B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
1029  else:
1030  break
1031  else:
1032  print("TEST SKIPPED: No working database servers configured, giving up", file=sys.stderr)
1033  sys.exit(1)
1034 
1035 
1036 def enable_debugging():
1037  """Enable verbose output of python-requests to be able to debug http connections"""
1038  # These two lines enable debugging at httplib level (requests->urllib3->http.client)
1039  # You will see the REQUEST, including HEADERS and DATA, and RESPONSE with HEADERS but without DATA.
1040  # The only thing missing will be the response.body which is not logged.
1041  import http.client as http_client
1042  import logging
1043  http_client.HTTPConnection.debuglevel = 1
1044  # You must initialize logging, otherwise you'll not see debug output.
1045  logging.basicConfig()
1046  logging.getLogger().setLevel(logging.DEBUG)
1047  requests_log = logging.getLogger("requests.packages.urllib3")
1048  requests_log.setLevel(logging.DEBUG)
1049  requests_log.propagate = True
def __call__(self, r)
Definition: __init__.py:130
_authtoken
Authorization header to send with each request.
Definition: __init__.py:128
def __init__(self, token)
Definition: __init__.py:125
def get_payloads(self, global_tag=None)
Definition: __init__.py:505
def get_iovs(self, globalTagName, payloadName=None)
Definition: __init__.py:684
def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None)
Definition: __init__.py:714
def set_authentication_token(self, token)
Definition: __init__.py:320
def delete_iov(self, iovId)
Definition: __init__.py:657
def get_all_iovs(self, globalTag, exp=None, run=None, message=None, run_range=None, fully_contained=False)
Definition: __init__.py:444
def get_globalTagType(self, name)
Definition: __init__.py:412
def check_payloads(self, payloads, information="payloadId")
Definition: __init__.py:528
def has_globalTag(self, name)
Definition: __init__.py:390
def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun)
Definition: __init__.py:622
def request(self, method, url, message=None, *args, **argk)
Definition: __init__.py:330
def __init__(self, base_url=None, max_connections=10, retries=3)
Definition: __init__.py:273
def get_revisions(self, entries)
Definition: __init__.py:560
def create_globalTag(self, name, description, user)
Definition: __init__.py:431
def get_globalTagInfo(self, name)
Definition: __init__.py:400
_session
session object to get keep-alive support and connection pooling
Definition: __init__.py:284
def get_base_urls(given_url)
Definition: __init__.py:242
def modify_iov(self, iovId, firstExp, firstRun, finalExp, finalRun)
Definition: __init__.py:668
def staging_request(self, filename, normalize, data, password)
Definition: __init__.py:859
def create_payload(self, module, filename, checksum=None)
Definition: __init__.py:581
_base_url
base url to be prepended to all requests
Definition: __init__.py:303
payload_id
payload id in CDB, not used for comparisons
Definition: __init__.py:175
iov_id
iov id in CDB, not used for comparisons
Definition: __init__.py:177
name
name of the payload
Definition: __init__.py:167
checksum
checksum of the payload
Definition: __init__.py:169
def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None)
Definition: __init__.py:162
def from_json(cls, payload, iov=None)
Definition: __init__.py:141
revision
revision, not used for comparisons
Definition: __init__.py:173
iov
interval of validity
Definition: __init__.py:171