Belle II Software  release-05-01-25
__init__.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 """
5 conditions_db
6 -------------
7 
8 Python interface to the ConditionsDB
9 """
10 
11 import os
12 from basf2 import B2FATAL, B2ERROR, B2INFO, B2WARNING
13 import requests
14 from requests.auth import HTTPBasicAuth, HTTPDigestAuth
15 from requests.packages.urllib3.fields import RequestField
16 from requests.packages.urllib3.filepost import encode_multipart_formdata
17 import json
18 import urllib
19 from versioning import upload_global_tag, jira_global_tag_v2
20 from collections import defaultdict
21 from concurrent.futures import ThreadPoolExecutor, wait as futures_wait
22 import hashlib
23 import itertools
24 
25 
26 def encode_name(name):
27  """Escape name to be used in an url"""
28  return urllib.parse.quote(name, safe="")
29 
30 
31 def file_checksum(filename):
32  """Calculate md5 hash of file"""
33  md5hash = hashlib.md5()
34  with open(filename, "rb") as data:
35  md5hash.update(data.read())
36  return md5hash.hexdigest()
37 
38 
39 def chunks(container, chunk_size):
40  """Cut a container in chunks of max. chunk_size"""
41  it = iter(container)
42  while True:
43  chunk = tuple(itertools.islice(it, chunk_size))
44  if not chunk:
45  return
46  yield chunk
47 
48 
50  """Small container class to help compare payload information for efficient
51  comparison between globaltags"""
52 
53  @classmethod
54  def from_json(cls, payload, iov=None):
55  """Set all internal members from the json information of the payload and the iov.
56 
57  Arguments:
58  payload (dict): json information of the payload as returned by REST api
59  iov (dict): json information of the iov as returned by REST api
60  """
61  if iov is None:
62  iov = {"payloadIovId": None, "expStart": None, "runStart": None, "expEnd": None, "runEnd": None}
63 
64  return cls(
65  payload['payloadId'],
66  payload['basf2Module']['name'],
67  payload['revision'],
68  payload['checksum'],
69  payload['payloadUrl'],
70  payload['baseUrl'],
71  iov['payloadIovId'],
72  (iov["expStart"], iov["runStart"], iov["expEnd"], iov["runEnd"]),
73  )
74 
75  def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None):
76  """
77  Create a new object from the given information
78  """
79 
80  self.name = name
81 
82  self.checksum = checksum
83 
84  self.iov = iov
85 
86  self.revision = revision
87 
88  self.payload_id = payload_id
89 
90  self.iov_id = iov_id
91 
92  self.base_url = base_url
93 
94  self.payload_url = payload_url
95 
96  @property
97  def url(self):
98  """Return the full url to the payload on the server"""
99  return urllib.parse.urljoin(self.base_url + '/', self.payload_url)
100 
101  def __hash__(self):
102  """Make object hashable"""
103  return hash((self.name, self.checksum, self.iov))
104 
105  def __eq__(self, other):
106  """Check if two payloads are equal"""
107  return (self.name, self.checksum, self.iov) == (other.name, other.checksum, other.iov)
108 
109  def __lt__(self, other):
110  """Sort payloads by name, iov, revision"""
111  return (self.name.lower(), self.iov, self.revision) < (other.name.lower(), other.iov, other.revision)
112 
113  def readable_iov(self):
114  """return a human readable name for the IoV"""
115  if self.iov is None:
116  return "none"
117 
118  if self.iov == (0, 0, -1, -1):
119  return "always"
120 
121  e1, r1, e2, r2 = self.iov
122  if e1 == e2:
123  if r1 == 0 and r2 == -1:
124  return f"exp {e1}"
125  elif r2 == -1:
126  return f"exp {e1}, runs {r1}+"
127  elif r1 == r2:
128  return f"exp {e1}, run {r1}"
129  else:
130  return f"exp {e1}, runs {r1} - {r2}"
131  else:
132  if e2 == -1 and r1 == 0:
133  return f"exp {e1} - forever"
134  elif e2 == -1:
135  return f"exp {e1}, run {r1} - forever"
136  elif r1 == 0 and r2 == -1:
137  return f"exp {e1}-{e2}, all runs"
138  elif r2 == -1:
139  return f"exp {e1}, run {r1} - exp {e2}, all runs"
140  else:
141  return f"exp {e1}, run {r1} - exp {e2}, run {r2}"
142 
143 
145  """Class to interface conditions db REST interface"""
146 
147 
148  BASE_URLS = ["http://belle2db.sdcc.bnl.gov/b2s/rest/"]
149 
150  class RequestError(RuntimeError):
151  """Class to be thrown by request() if there is any error"""
152  pass
153 
154  @staticmethod
155  def get_base_urls(given_url):
156  """Resolve the list of server urls. If a url is given just return it.
157  Otherwise return servers listed in BELLE2_CONDB_SERVERLIST or the
158  builtin defaults
159 
160  Arguments:
161  given_url (str): Explicit base_url. If this is not None it will be
162  returned as is in a list of length 1
163 
164  Returns:
165  a list of urls to try for database connectivity
166  """
167 
168  base_url_list = ConditionsDB.BASE_URLS[:]
169  base_url_env = os.environ.get("BELLE2_CONDB_SERVERLIST", None)
170  if given_url is not None:
171  base_url_list = [given_url]
172  elif base_url_env is not None:
173  base_url_list = base_url_env.split()
174  B2INFO("Getting Conditions Database servers from Environment:")
175  for i, url in enumerate(base_url_list, 1):
176  B2INFO(f" {i}. {url}")
177  # try to escalate to https for all given urls
178  full_list = []
179  for url in base_url_list:
180  if url.startswith("http://"):
181  full_list.append("https" + url[4:])
182  # but keep the http in case of connection problems
183  full_list.append(url)
184  return full_list
185 
186  def __init__(self, base_url=None, max_connections=10, retries=3):
187  """
188  Create a new instance of the interface
189 
190  Args:
191  base_url (string): base url of the rest interface
192  max_connections (int): number of connections to keep open, mostly useful for threaded applications
193  retries (int): number of retries in case of connection problems
194  """
195 
196 
197  self._session = requests.Session()
198  # and set the connection options we want to have
199  adapter = requests.adapters.HTTPAdapter(
200  pool_connections=max_connections, pool_maxsize=max_connections,
201  max_retries=retries, pool_block=True
202  )
203  self._session.mount("http://", adapter)
204  self._session.mount("https://", adapter)
205  # and also set the proxy settings
206  if "BELLE2_CONDB_PROXY" in os.environ:
207  self._session.proxies = {
208  "http": os.environ.get("BELLE2_CONDB_PROXY"),
209  "https": os.environ.get("BELLE2_CONDB_PROXY"),
210  }
211  # test the given url or try the known defaults
212  base_url_list = ConditionsDB.get_base_urls(base_url)
213 
214  for url in base_url_list:
215 
216  self._base_url = url.rstrip("/") + "/"
217  try:
218  req = self._session.request("HEAD", self._base_url + "v2/globalTags")
219  req.raise_for_status()
220  except requests.RequestException as e:
221  B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
222  else:
223  break
224  else:
225  B2FATAL("No working database servers configured, giving up")
226 
227  # We have a working server so change the api to return json instead of
228  # xml, much easier in python, also request non-cached replies. We do
229  # this now because for the server check above we're fine with cached
230  # results.
231  self._session.headers.update({"Accept": "application/json", "Cache-Control": "no-cache"})
232 
233  def set_authentication(self, user, password, basic=True):
234  """
235  Set authentication credentials when talking to the database
236 
237  Args:
238  user (str): username
239  password (str): password
240  basic (bool): if True us HTTP Basic authentication, otherwise HTTP Digest
241  """
242  authtype = HTTPBasicAuth if basic else HTTPDigestAuth
243  self._session.auth = authtype(user, password)
244 
245  def request(self, method, url, message=None, *args, **argk):
246  """
247  Request function, similar to requests.request but adding the base_url
248 
249  Args:
250  method (str): GET, POST, etc.
251  url (str): url for the request, base_url will be prepended
252  message (str): message to show when starting the request and if it fails
253 
254  All other arguments will be forwarded to requests.request.
255  """
256  if message is not None:
257  B2INFO(message)
258 
259  try:
260  req = self._session.request(method, self._base_url + "v2/" + url.lstrip("/"), *args, **argk)
261  except requests.exceptions.ConnectionError as e:
262  B2FATAL("Could not access '" + self._base_url + url.lstrip("/") + "': " + str(e))
263 
264  if req.status_code >= 300:
265  # Apparently something is not good. Let's try to decode the json
266  # reply containing reason and message
267  try:
268  response = req.json()
269  message = response.get("message", "")
270  colon = ": " if message.strip() else ""
271  error = "Request {method} {url} returned {code} {reason}{colon}{message}".format(
272  method=method, url=url,
273  code=response["code"],
274  reason=response["reason"],
275  message=message,
276  colon=colon,
277  )
278  except json.JSONDecodeError:
279  # seems the reply was not even json
280  error = "Request {method} {url} returned non JSON response {code}: {content}".format(
281  method=method, url=url,
282  code=req.status_code,
283  content=req.content
284  )
285 
286  if message is not None:
287  raise ConditionsDB.RequestError("{} failed: {}".format(message, error))
288  else:
289  raise ConditionsDB.RequestError(error)
290 
291  if method != "HEAD" and req.status_code != requests.codes.no_content:
292  try:
293  req.json()
294  except json.JSONDecodeError as e:
295  B2INFO("Invalid response: {}".format(req.content))
296  raise ConditionsDB.RequestError("{method} {url} returned invalid JSON response {}"
297  .format(e, method=method, url=url))
298  return req
299 
300  def get_globalTags(self):
301  """Get a list of all globaltags. Returns a dictionary with the globaltag
302  names and the corresponding ids in the database"""
303 
304  try:
305  req = self.request("GET", "/globalTags")
306  except ConditionsDB.RequestError as e:
307  B2ERROR("Could not get the list of globaltags: {}".format(e))
308  return None
309 
310  result = {}
311  for tag in req.json():
312  result[tag["name"]] = tag
313 
314  return result
315 
316  def has_globalTag(self, name):
317  """Check whether the globaltag with the given name exists."""
318 
319  try:
320  self.request("GET", "/globalTag/{globalTagName}".format(globalTagName=encode_name(name)))
322  return False
323 
324  return True
325 
326  def get_globalTagInfo(self, name):
327  """Get the id of the globaltag with the given name. Returns either the
328  id or None if the tag was not found"""
329 
330  try:
331  req = self.request("GET", "/globalTag/{globalTagName}".format(globalTagName=encode_name(name)))
332  except ConditionsDB.RequestError as e:
333  B2ERROR("Cannot find globaltag '{}': {}".format(name, e))
334  return None
335 
336  return req.json()
337 
338  def get_globalTagType(self, name):
339  """
340  Get the dictionary describing the given globaltag type (currently
341  one of DEV or RELEASE). Returns None if tag type was not found.
342  """
343  try:
344  req = self.request("GET", "/globalTagType")
345  except ConditionsDB.RequestError as e:
346  B2ERROR("Could not get list of valid globaltag types: {}".format(e))
347  return None
348 
349  types = {e["name"]: e for e in req.json()}
350 
351  if name in types:
352  return types[name]
353 
354  B2ERROR("Unknown globaltag type: '{}', please use one of {}".format(name, ", ".join(types)))
355  return None
356 
357  def create_globalTag(self, name, description, user):
358  """
359  Create a new globaltag
360  """
361  info = {"name": name, "description": description, "modifiedBy": user, "isDefault": False}
362  try:
363  req = self.request("POST", f"/globalTag/DEV", f"Creating globaltag {name}", json=info)
364  except ConditionsDB.RequestError as e:
365  B2ERROR(f"Could not create globaltag {name}: {e}")
366  return None
367 
368  return req.json()
369 
370  def get_all_iovs(self, globalTag, exp=None, run=None, message=None):
371  """
372  Return list of all payloads in the given globaltag where each element is
373  a `PayloadInformation` instance
374 
375  Parameters:
376  gobalTag (str): name of the globaltag
377  exp (int): if given limit the list of payloads to the ones valid for
378  the given exp,run combination
379  run (int): if given limit the list of payloads to the ones valid for
380  the given exp,run combination
381  message (str): additional message to show when downloading the
382  payload information. Will be directly appended to
383  "Obtaining lists of iovs for globaltag {globalTag}"
384 
385  Warning:
386  Both, exp and run, need to be given at the same time. Just supplying
387  an experiment or a run number will not work
388  """
389  globalTag = encode_name(globalTag)
390  if message is None:
391  message = ""
392  if exp is not None:
393  msg = f"Obtaining list of iovs for globaltag {globalTag}, exp={exp}, run={run}{message}"
394  req = self.request("GET", "/iovPayloads", msg, params={'gtName': globalTag, 'expNumber': exp, 'runNumber': run})
395  else:
396  msg = f"Obtaining list of iovs for globaltag {globalTag}{message}"
397  req = self.request("GET", f"/globalTag/{globalTag}/globalTagPayloads", msg)
398  all_iovs = []
399  for item in req.json():
400  payload = item["payload" if 'payload' in item else "payloadId"]
401  if "payloadIov" in item:
402  iovs = [item['payloadIov']]
403  else:
404  iovs = item['payloadIovs']
405 
406  for iov in iovs:
407  all_iovs.append(PayloadInformation.from_json(payload, iov))
408 
409  all_iovs.sort()
410  return all_iovs
411 
412  def get_payloads(self, global_tag=None):
413  """
414  Get a list of all defined payloads (for the given global_tag or by default for all).
415  Returns a dictionary which maps (module, checksum) to the payload id.
416  """
417 
418  try:
419  if global_tag:
420  req = self.request("GET", "/globalTag/{global_tag}/payloads"
421  .format(global_tag=encode_name(global_tag)))
422  else:
423  req = self.request("GET", "/payloads")
424  except ConditionsDB.RequestError as e:
425  B2ERROR("Cannot get list of payloads: {}".format(e))
426  return {}
427 
428  result = {}
429  for payload in req.json():
430  module = payload["basf2Module"]["name"]
431  checksum = payload["checksum"]
432  result[(module, checksum)] = payload["payloadId"]
433 
434  return result
435 
436  def check_payloads(self, payloads, information="payloadId"):
437  """
438  Check for the existence of payloads in the database.
439 
440  Arguments:
441  payloads (list((str,str))): A list of payloads to check for. Each
442  payload needs to be a tuple of the name of the payload and the
443  md5 checksum of the payload file.
444  information (str): The information to be extracted from the
445  payload dictionary
446 
447  Returns:
448  A dictionary with the payload identifiers (name, checksum) as keys
449  and the requested information as values for all payloads which are already
450  present in the database.
451  """
452 
453  search_query = [{"name": e[0], "checksum": e[1]} for e in payloads]
454  try:
455  req = self.request("POST", "/checkPayloads", json=search_query)
456  except ConditionsDB.RequestError as e:
457  B2ERROR("Cannot check for existing payloads: {}".format(e))
458  return {}
459 
460  result = {}
461  for payload in req.json():
462  module = payload["basf2Module"]["name"]
463  checksum = payload["checksum"]
464  result[(module, checksum)] = payload[information]
465 
466  return result
467 
468  def get_revisions(self, entries):
469  """
470  Get the revision numbers of payloads in the database.
471 
472  Arguments:
473  entries (list): A list of payload entries.
474  Each entry must have the attributes module and checksum.
475 
476  Returns:
477  True if successful.
478  """
479 
480  result = self.check_payloads([(entry.module, entry.checksum) for entry in entries], "revision")
481  if not result:
482  return False
483 
484  for entry in entries:
485  entry.revision = result.get((entry.module, entry.checksum), 0)
486 
487  return True
488 
489  def create_payload(self, module, filename, checksum=None):
490  """
491  Create a new payload
492 
493  Args:
494  module (str): name of the module
495  filename (str): name of the file
496  checksum (str): md5 hexdigest of the file. Will be calculated automatically if not given
497  """
498  if checksum is None:
499  checksum = file_checksum(filename)
500 
501  # this is the only complicated request as we have to provide a
502  # multipart/mixed request which is not directly provided by the request
503  # library.
504  files = [
505  (filename, open(filename, "rb").read(), "application/x-root"),
506  ("json", json.dumps({"checksum": checksum, "isDefault": False}), "application/json"),
507  ]
508  # ok we have the two "files" we want to send, create multipart/mixed
509  # body
510  fields = []
511  for name, contents, mimetype in files:
512  rf = RequestField(name=name, data=contents)
513  rf.make_multipart(content_type=mimetype)
514  fields.append(rf)
515 
516  post_body, content_type = encode_multipart_formdata(fields)
517  content_type = ''.join(('multipart/mixed',) + content_type.partition(';')[1:])
518  headers = {'Content-Type': content_type}
519 
520  # now make the request. Note to self: if multipart/form-data would be
521  # accepted this would be so much nicer here. but it works.
522  try:
523  req = self.request("POST", "/package/dbstore/module/{moduleName}/payload"
524  .format(moduleName=encode_name(module)),
525  data=post_body, headers=headers)
526  except ConditionsDB.RequestError as e:
527  B2ERROR("Could not create Payload: {}".format(e))
528  return None
529 
530  return req.json()["payloadId"]
531 
532  def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun):
533  """
534  Create an iov.
535 
536  Args:
537  globalTagId (int): id of the globaltag, obtain with get_globalTagId()
538  payloadId (int): id of the payload, obtain from create_payload() or get_payloads()
539  firstExp (int): first experiment for which this iov is valid
540  firstRun (int): first run for which this iov is valid
541  finalExp (int): final experiment for which this iov is valid
542  finalRun (int): final run for which this iov is valid
543 
544  Returns:
545  payloadIovId of the created iov, None if creation was not successful
546  """
547  try:
548  # try to convert all arguments except self to integers to make sure they are
549  # valid.
550  local_variables = locals()
551  variables = {e: int(local_variables[e]) for e in
552  ["globalTagId", "payloadId", "firstExp", "firstRun", "finalExp", "finalRun"]}
553  except ValueError:
554  B2ERROR("create_iov: All parameters need to be integers")
555  return None
556 
557  # try to create the iov
558  try:
559  req = self.request("POST", "/globalTagPayload/{globalTagId},{payloadId}"
560  "/payloadIov/{firstExp},{firstRun},{finalExp},{finalRun}".format(**variables))
561  except ConditionsDB.RequestError as e:
562  B2ERROR("Could not create IOV: {}".format(e))
563  return None
564 
565  return req.json()["payloadIovId"]
566 
567  def get_iovs(self, globalTagName):
568  """Return existing iovs for a given tag name. It returns a dictionary
569  which maps (payloadId, first runId, final runId) to iovId"""
570 
571  try:
572  req = self.request("GET", "/globalTag/{globalTagName}/globalTagPayloads"
573  .format(globalTagName=encode_name(globalTagName)))
574  except ConditionsDB.RequestError as e:
575  # there could be just no iovs so no error
576  return {}
577 
578  result = {}
579  for payload in req.json():
580  payloadId = payload["payloadId"]["payloadId"]
581  for iov in payload["payloadIovs"]:
582  iovId = iov["payloadIovId"]
583  firstExp, firstRun = iov["expStart"], iov["runStart"]
584  finalExp, finalRun = iov["expEnd"], iov["runEnd"]
585  result[(payloadId, firstExp, firstRun, finalExp, finalRun)] = iovId
586 
587  return result
588 
589  def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None):
590  """
591  Upload a testing payload storage to the conditions database.
592 
593  Parameters:
594  filename (str): filename of the testing payload storage file that should be uploaded
595  global_tage (str): name of the globaltag to which the data should be uploaded
596  normalize (bool/str): if True the payload root files will be normalized to have the same checksum for the same content,
597  if normalize is a string in addition the file name in the root file metadata will be set to it
598  ignore_existing (bool): if True do not upload payloads that already exist
599  nprocess (int): maximal number of parallel uploads
600  uploaded_entries (list): the list of successfully uploaded entries
601 
602  Returns:
603  True if the upload was successful
604  """
605 
606  # first create a list of payloads
607  from conditions_db.testing_payloads import parse_testing_payloads_file
608  B2INFO(f"Reading payload list from {filename}")
609  entries = parse_testing_payloads_file(filename)
610  if entries is None:
611  B2ERROR(f"Problems with testing payload storage file {filename}, exiting")
612  return False
613 
614  if not entries:
615  B2INFO(f"No payloads found in {filename}, exiting")
616  return True
617 
618  B2INFO(f"Found {len(entries)} iovs to upload")
619 
620  # time to get the id for the globaltag
621  tagId = self.get_globalTagInfo(global_tag)
622  if tagId is None:
623  return False
624  tagId = tagId["globalTagId"]
625 
626  # now we could have more than one payload with the same iov so let's go over
627  # it again and remove duplicates but keep the last one for each
628  entries = sorted(set(reversed(entries)))
629 
630  if normalize:
631  name = normalize if normalize is not True else None
632  for e in entries:
633  e.normalize(name=name)
634 
635  # so let's have a list of all payloads (name, checksum) as some payloads
636  # might have multiple iovs. Each payload gets a list of all of those
637  payloads = defaultdict(list)
638  for e in entries:
639  payloads[(e.module, e.checksum)].append(e)
640 
641  existing_payloads = {}
642  existing_iovs = {}
643 
644  def upload_payload(item):
645  """Upload a payload file if necessary but first check list of existing payloads"""
646  key, entries = item
647  if key in existing_payloads:
648  B2INFO(f"{key[0]} (md5:{key[1]}) already existing in database, skipping.")
649  payload_id = existing_payloads[key]
650  else:
651  entry = entries[0]
652  payload_id = self.create_payload(entry.module, entry.filename, entry.checksum)
653  if payload_id is None:
654  return False
655 
656  B2INFO(f"Created new payload {payload_id} for {entry.module} (md5:{entry.checksum})")
657 
658  for entry in entries:
659  entry.payload = payload_id
660 
661  return True
662 
663  def create_iov(entry):
664  """Create an iov if necessary but first check the list of existing iovs"""
665  if entry.payload is None:
666  return None
667 
668  iov_key = (entry.payload,) + entry.iov_tuple
669  if iov_key in existing_iovs:
670  entry.iov = existing_iovs[iov_key]
671  B2INFO(f"IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum}) already existing in database, skipping.")
672  else:
673  entry.payloadIovId = self.create_iov(tagId, entry.payload, *entry.iov_tuple)
674  if entry.payloadIovId is None:
675  return False
676 
677  B2INFO(f"Created IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum})")
678 
679  return entry
680 
681  # multithreading for the win ...
682  with ThreadPoolExecutor(max_workers=nprocess) as pool:
683  # if we want to check for existing payloads/iovs we schedule the download of
684  # the full payload list. And write a message as each completes
685  if not ignore_existing:
686  B2INFO("Downloading information about existing payloads and iovs...")
687  futures = []
688  existing_iovs = {}
689  existing_payloads = {}
690 
691  def create_future(iter, func, callback=None):
692  fn = pool.submit(iter, func)
693  if callback is not None:
694  fn.add_done_callback(callback)
695  futures.append(fn)
696 
697  def update_iovs(iovs):
698  existing_iovs.update(iovs.result())
699  B2INFO(f"Found {len(existing_iovs)} existing iovs in {global_tag}")
700 
701  def update_payloads(payloads):
702  existing_payloads.update(payloads.result())
703  B2INFO(f"Found {len(existing_payloads)} existing payloads")
704 
705  create_future(self.get_iovs, global_tag, update_iovs)
706  # checking existing payloads should not be done with too many at once
707  for chunk in chunks(payloads.keys(), 1000):
708  create_future(self.check_payloads, chunk, update_payloads)
709 
710  futures_wait(futures)
711 
712  # upload payloads
713  failed_payloads = sum(0 if result else 1 for result in pool.map(upload_payload, payloads.items()))
714  if failed_payloads > 0:
715  B2ERROR(f"{failed_payloads} payloads could not be uploaded")
716 
717  # create IoVs
718  failed_iovs = 0
719  for entry in pool.map(create_iov, entries):
720  if entry:
721  if uploaded_entries is not None:
722  uploaded_entries.append(entry)
723  else:
724  failed_iovs += 1
725  if failed_iovs > 0:
726  B2ERROR(f"{failed_iovs} IoVs could not be created")
727 
728  # update revision numbers
729  if uploaded_entries is not None:
730  self.get_revisions(uploaded_entries)
731 
732  return failed_payloads + failed_iovs == 0
733 
734  def staging_request(self, filename, normalize, data, password):
735  """
736  Upload a testing payload storage to a staging globaltag and create or update a jira issue
737 
738  Parameters:
739  filename (str): filename of the testing payload storage file that should be uploaded
740  normalize (bool/str): if True the payload root files will be
741  normalized to have the same checksum for the same content, if
742  normalize is a string in addition the file name in the root file
743  metadata will be set to it
744  data (dict): a dictionary with the information provided by the user:
745 
746  * task: category of globaltag, either master, online, prompt, data, mc, or analysis
747  * tag: the globaltage name
748  * request: type of request, either Update, New, or Modification. The latter two imply task == master because
749  if new payload classes are introduced or payload classes are modified then they will first be included in
750  the master globaltag. Here a synchronization of code and payload changes has to be managed.
751  If new or modified payload classes should be included in other globaltags they must already be in a release.
752  * pull-request: number of the pull request containing new or modified payload classes,
753  only for request == New or Modified
754  * backward-compatibility: description of what happens if the old payload is encountered by the updated code,
755  only for request == Modified
756  * forward-compatibility: description of what happens if a new payload is encountered by the existing code,
757  only for request == Modified
758  * release: the required release version
759  * reason: the reason for the request
760  * description: a detailed description for the globaltag manager
761  * issue: identifier of an existing jira issue (optional)
762  * user: name of the user
763  * time: time stamp of the request
764 
765  password: the password for access to jira or the access token and secret for oauth access
766 
767  Returns:
768  True if the upload and jira issue creation/upload was successful
769  """
770 
771  # determine the staging globaltag name
772  data['tag'] = upload_global_tag(data['task'])
773  if data['tag'] is None:
774  data['tag'] = f"staging_{data['task']}_{data['user']}_{data['time']}"
775 
776  # create the staging globaltag if it does not exists yet
777  if not self.has_globalTag(data['tag']):
778  if not self.create_globalTag(data['tag'], data['reason'], data['user']):
779  return False
780 
781  # upload the payloads
782  B2INFO(f"Uploading testing database {filename} to globaltag {data['tag']}")
783  entries = []
784  if not self.upload(filename, data['tag'], normalize, uploaded_entries=entries):
785  return False
786 
787  # get the dictionary for the jira issue creation/update
788  if data['issue']:
789  issue = data['issue']
790  else:
791  issue = jira_global_tag_v2(data['task'])
792  if issue is None:
793  issue = {"components": [{"name": "globaltag"}]}
794 
795  # create jira issue text from provided information
796  if type(issue) is tuple:
797  description = issue[1].format(**data)
798  issue = issue[0]
799  else:
800  description = f"""
801 |*Upload globaltag* | {data['tag']} |
802 |*Request reason* | {data['reason']} |
803 |*Required release* | {data['release']} |
804 |*Type of request* | {data['request']} |
805 """
806  if 'pull-request' in data.keys():
807  description += f"|*Pull request* | \\#{data['pull-request']} |\n"
808  if 'backward-compatibility' in data.keys():
809  description += f"|*Backward compatibility* | \\#{data['backward-compatibility']} |\n"
810  if 'forward-compatibility' in data.keys():
811  description += f"|*Forward compatibility* | \\#{data['forward-compatibility']} |\n"
812  description += '|*Details* |' + ''.join(data['details']) + ' |\n'
813  if data['task'] == 'online':
814  description += f'|*Impact on data taking*|' + ''.join(data['data_taking']) + ' |\n'
815 
816  # add information about uploaded payloads/IoVs
817  description += '\nPayloads\n||Name||Revision||IoV||\n'
818  for entry in entries:
819  description += f"|{entry.module} | {entry.revision} | ({entry.iov_str()}) |\n"
820 
821  # create a new issue
822  if type(issue) is dict:
823  issue["description"] = description
824  if "summary" in issue.keys():
825  issue["summary"] = issue["summary"].format(**data)
826  else:
827  issue["summary"] = f"Globaltag request for {data['task']} by {data['user']} at {data['time']}"
828  if "project" not in issue.keys():
829  issue["project"] = {"key": "BII"}
830  if "issuetype" not in issue.keys():
831  issue["issuetype"] = {"name": "Task"}
832 
833  B2INFO(f"Creating jira issue for {data['task']} globaltag request")
834  if isinstance(password, str):
835  response = requests.post('https://agira.desy.de/rest/api/latest/issue', auth=(data['user'], password),
836  json={'fields': issue})
837  else:
838  fields = {'issue': json.dumps(issue)}
839  if 'user' in data.keys():
840  fields['user'] = data['user']
841  if password:
842  fields['token'] = password[0]
843  fields['secret'] = password[1]
844  response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
845  if response.status_code in range(200, 210):
846  B2INFO(f"Issue successfully created: https://agira.desy.de/browse/{response.json()['key']}")
847  else:
848  B2ERROR('The creation of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
849  return False
850 
851  # comment on an existing issue
852  else:
853  # Let's make sure all assignees of new issues are added as watchers
854  # in that case, otherwise they might never find out
855  new_issue_config = jira_global_tag_v2(data['task'])
856  if isinstance(new_issue_config, dict) and "assignee" in new_issue_config:
857  user = new_issue_config['assignee'].get('name', None)
858  if user is not None and isinstance(password, str):
859  response = requests.post(f'https://agira.desy.de/rest/api/latest/issue/{issue}/watchers',
860  auth=(data['user'], password), json=user)
861  if response.status_code in range(200, 210):
862  B2INFO(f"Added {user} as watcher to {issue}")
863  else:
864  B2WARNING(f"Could not add {user} as watcher to {issue}: {response.status_code}")
865 
866  B2INFO(f"Commenting on jira issue {issue} for {data['task']} globaltag request")
867  if isinstance(password, str):
868  response = requests.post('https://agira.desy.de/rest/api/latest/issue/%s/comment' % issue,
869  auth=(data['user'], password), json={'body': description})
870  else:
871  fields = {'id': issue, 'user': user, 'comment': description}
872  if password:
873  fields['token'] = password[0]
874  fields['secret'] = password[1]
875  response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
876  if response.status_code in range(200, 210):
877  B2INFO(f"Issue successfully updated: https://agira.desy.de/browse/{issue}")
878  else:
879  B2ERROR('The commenting of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
880  return False
881 
882  return True
883 
884 
885 def require_database_for_test(timeout=60, base_url=None):
886  """Make sure that the database is available and skip the test if not.
887 
888  This function should be called in test scripts if they are expected to fail
889  if the database is down. It either returns when the database is ok or it
890  will signal test_basf2 that the test should be skipped and exit
891  """
892  import sys
893  if os.environ.get("BELLE2_CONDB_GLOBALTAG", None) == "":
894  raise Exception("Access to the Database is disabled")
895  base_url_list = ConditionsDB.get_base_urls(base_url)
896  for url in base_url_list:
897  try:
898  req = requests.request("HEAD", url.rstrip('/') + "/v2/globalTags")
899  req.raise_for_status()
900  except requests.RequestException as e:
901  B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
902  else:
903  break
904  else:
905  print("TEST SKIPPED: No working database servers configured, giving up", file=sys.stderr)
906  sys.exit(1)
907 
908 
909 def enable_debugging():
910  """Enable verbose output of python-requests to be able to debug http connections"""
911  # These two lines enable debugging at httplib level (requests->urllib3->http.client)
912  # You will see the REQUEST, including HEADERS and DATA, and RESPONSE with HEADERS but without DATA.
913  # The only thing missing will be the response.body which is not logged.
914  import http.client as http_client
915  import logging
916  http_client.HTTPConnection.debuglevel = 1
917  # You must initialize logging, otherwise you'll not see debug output.
918  logging.basicConfig()
919  logging.getLogger().setLevel(logging.DEBUG)
920  requests_log = logging.getLogger("requests.packages.urllib3")
921  requests_log.setLevel(logging.DEBUG)
922  requests_log.propagate = True
conditions_db.PayloadInformation.payload_id
payload_id
payload id in CDB, not used for comparisons
Definition: __init__.py:88
conditions_db.PayloadInformation.iov_id
iov_id
iov id in CDB, not used for comparisons
Definition: __init__.py:90
conditions_db.ConditionsDB._base_url
_base_url
base url to be prepended to all requests
Definition: __init__.py:216
conditions_db.PayloadInformation.readable_iov
def readable_iov(self)
Definition: __init__.py:113
conditions_db.PayloadInformation.__lt__
def __lt__(self, other)
Definition: __init__.py:109
conditions_db.ConditionsDB.get_payloads
def get_payloads(self, global_tag=None)
Definition: __init__.py:412
conditions_db.PayloadInformation.iov
iov
interval of validity
Definition: __init__.py:84
conditions_db.ConditionsDB.get_globalTagInfo
def get_globalTagInfo(self, name)
Definition: __init__.py:326
conditions_db.ConditionsDB.create_iov
def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun)
Definition: __init__.py:532
conditions_db.ConditionsDB.check_payloads
def check_payloads(self, payloads, information="payloadId")
Definition: __init__.py:436
conditions_db.PayloadInformation.__hash__
def __hash__(self)
Definition: __init__.py:101
conditions_db.ConditionsDB.create_payload
def create_payload(self, module, filename, checksum=None)
Definition: __init__.py:489
conditions_db.ConditionsDB.get_globalTagType
def get_globalTagType(self, name)
Definition: __init__.py:338
conditions_db.PayloadInformation.payload_url
payload_url
payload url
Definition: __init__.py:94
conditions_db.ConditionsDB.get_all_iovs
def get_all_iovs(self, globalTag, exp=None, run=None, message=None)
Definition: __init__.py:370
conditions_db.ConditionsDB.get_revisions
def get_revisions(self, entries)
Definition: __init__.py:468
conditions_db.PayloadInformation.name
name
name of the payload
Definition: __init__.py:80
conditions_db.ConditionsDB.__init__
def __init__(self, base_url=None, max_connections=10, retries=3)
Definition: __init__.py:186
conditions_db.ConditionsDB.request
def request(self, method, url, message=None, *args, **argk)
Definition: __init__.py:245
conditions_db.ConditionsDB.RequestError
Definition: __init__.py:150
conditions_db.ConditionsDB._session
_session
session object to get keep-alive support and connection pooling
Definition: __init__.py:197
conditions_db.PayloadInformation.revision
revision
revision, not used for comparisons
Definition: __init__.py:86
conditions_db.PayloadInformation.checksum
checksum
checksum of the payload
Definition: __init__.py:82
conditions_db.PayloadInformation.url
def url(self)
Definition: __init__.py:97
conditions_db.ConditionsDB.get_iovs
def get_iovs(self, globalTagName)
Definition: __init__.py:567
conditions_db.ConditionsDB.get_base_urls
def get_base_urls(given_url)
Definition: __init__.py:155
conditions_db.ConditionsDB.has_globalTag
def has_globalTag(self, name)
Definition: __init__.py:316
conditions_db.ConditionsDB
Definition: __init__.py:144
conditions_db.ConditionsDB.create_globalTag
def create_globalTag(self, name, description, user)
Definition: __init__.py:357
conditions_db.ConditionsDB.set_authentication
def set_authentication(self, user, password, basic=True)
Definition: __init__.py:233
conditions_db.PayloadInformation.base_url
base_url
base url
Definition: __init__.py:92
conditions_db.testing_payloads
Definition: testing_payloads.py:1
conditions_db.ConditionsDB.upload
def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None)
Definition: __init__.py:589
conditions_db.PayloadInformation.__eq__
def __eq__(self, other)
Definition: __init__.py:105
conditions_db.ConditionsDB.staging_request
def staging_request(self, filename, normalize, data, password)
Definition: __init__.py:734
conditions_db.PayloadInformation.__init__
def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None)
Definition: __init__.py:75
conditions_db.ConditionsDB.get_globalTags
def get_globalTags(self)
Definition: __init__.py:300
conditions_db.PayloadInformation.from_json
def from_json(cls, payload, iov=None)
Definition: __init__.py:54
conditions_db.PayloadInformation
Definition: __init__.py:49