Belle II Software  release-05-02-19
__init__.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 """
5 conditions_db
6 -------------
7 
8 Python interface to the ConditionsDB
9 """
10 
11 import os
12 from basf2 import B2FATAL, B2ERROR, B2INFO, B2WARNING
13 import requests
14 from requests.auth import HTTPBasicAuth, HTTPDigestAuth
15 from requests.packages.urllib3.fields import RequestField
16 from requests.packages.urllib3.filepost import encode_multipart_formdata
17 import json
18 import urllib
19 from versioning import upload_global_tag, jira_global_tag_v2
20 from collections import defaultdict
21 from concurrent.futures import ThreadPoolExecutor, wait as futures_wait
22 import hashlib
23 import itertools
24 
25 
26 def encode_name(name):
27  """Escape name to be used in an url"""
28  return urllib.parse.quote(name, safe="")
29 
30 
31 def file_checksum(filename):
32  """Calculate md5 hash of file"""
33  md5hash = hashlib.md5()
34  with open(filename, "rb") as data:
35  md5hash.update(data.read())
36  return md5hash.hexdigest()
37 
38 
39 def chunks(container, chunk_size):
40  """Cut a container in chunks of max. chunk_size"""
41  it = iter(container)
42  while True:
43  chunk = tuple(itertools.islice(it, chunk_size))
44  if not chunk:
45  return
46  yield chunk
47 
48 
50  """Small container class to help compare payload information for efficient
51  comparison between globaltags"""
52 
53  @classmethod
54  def from_json(cls, payload, iov=None):
55  """Set all internal members from the json information of the payload and the iov.
56 
57  Arguments:
58  payload (dict): json information of the payload as returned by REST api
59  iov (dict): json information of the iov as returned by REST api
60  """
61  if iov is None:
62  iov = {"payloadIovId": None, "expStart": None, "runStart": None, "expEnd": None, "runEnd": None}
63 
64  return cls(
65  payload['payloadId'],
66  payload['basf2Module']['name'],
67  payload['revision'],
68  payload['checksum'],
69  payload['payloadUrl'],
70  payload['baseUrl'],
71  iov['payloadIovId'],
72  (iov["expStart"], iov["runStart"], iov["expEnd"], iov["runEnd"]),
73  )
74 
75  def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None):
76  """
77  Create a new object from the given information
78  """
79 
80  self.name = name
81 
82  self.checksum = checksum
83 
84  self.iov = iov
85 
86  self.revision = revision
87 
88  self.payload_id = payload_id
89 
90  self.iov_id = iov_id
91 
92  self.base_url = base_url
93 
94  self.payload_url = payload_url
95 
96  @property
97  def url(self):
98  """Return the full url to the payload on the server"""
99  return urllib.parse.urljoin(self.base_url + '/', self.payload_url)
100 
101  def __hash__(self):
102  """Make object hashable"""
103  return hash((self.name, self.checksum, self.iov))
104 
105  def __eq__(self, other):
106  """Check if two payloads are equal"""
107  return (self.name, self.checksum, self.iov) == (other.name, other.checksum, other.iov)
108 
109  def __lt__(self, other):
110  """Sort payloads by name, iov, revision"""
111  return (self.name.lower(), self.iov, self.revision) < (other.name.lower(), other.iov, other.revision)
112 
113  def readable_iov(self):
114  """return a human readable name for the IoV"""
115  if self.iov is None:
116  return "none"
117 
118  if self.iov == (0, 0, -1, -1):
119  return "always"
120 
121  e1, r1, e2, r2 = self.iov
122  if e1 == e2:
123  if r1 == 0 and r2 == -1:
124  return f"exp {e1}"
125  elif r2 == -1:
126  return f"exp {e1}, runs {r1}+"
127  elif r1 == r2:
128  return f"exp {e1}, run {r1}"
129  else:
130  return f"exp {e1}, runs {r1} - {r2}"
131  else:
132  if e2 == -1 and r1 == 0:
133  return f"exp {e1} - forever"
134  elif e2 == -1:
135  return f"exp {e1}, run {r1} - forever"
136  elif r1 == 0 and r2 == -1:
137  return f"exp {e1}-{e2}, all runs"
138  elif r2 == -1:
139  return f"exp {e1}, run {r1} - exp {e2}, all runs"
140  else:
141  return f"exp {e1}, run {r1} - exp {e2}, run {r2}"
142 
143 
145  """Class to interface conditions db REST interface"""
146 
147 
148  BASE_URLS = ["http://belle2db.sdcc.bnl.gov/b2s/rest/"]
149 
150  class RequestError(RuntimeError):
151  """Class to be thrown by request() if there is any error"""
152  pass
153 
154  @staticmethod
155  def get_base_urls(given_url):
156  """Resolve the list of server urls. If a url is given just return it.
157  Otherwise return servers listed in BELLE2_CONDB_SERVERLIST or the
158  builtin defaults
159 
160  Arguments:
161  given_url (str): Explicit base_url. If this is not None it will be
162  returned as is in a list of length 1
163 
164  Returns:
165  a list of urls to try for database connectivity
166  """
167 
168  base_url_list = ConditionsDB.BASE_URLS[:]
169  base_url_env = os.environ.get("BELLE2_CONDB_SERVERLIST", None)
170  if given_url is not None:
171  base_url_list = [given_url]
172  elif base_url_env is not None:
173  base_url_list = base_url_env.split()
174  B2INFO("Getting Conditions Database servers from Environment:")
175  for i, url in enumerate(base_url_list, 1):
176  B2INFO(f" {i}. {url}")
177  # try to escalate to https for all given urls
178  full_list = []
179  for url in base_url_list:
180  if url.startswith("http://"):
181  full_list.append("https" + url[4:])
182  # but keep the http in case of connection problems
183  full_list.append(url)
184  return full_list
185 
186  def __init__(self, base_url=None, max_connections=10, retries=3):
187  """
188  Create a new instance of the interface
189 
190  Args:
191  base_url (string): base url of the rest interface
192  max_connections (int): number of connections to keep open, mostly useful for threaded applications
193  retries (int): number of retries in case of connection problems
194  """
195 
196 
197  self._session = requests.Session()
198  # and set the connection options we want to have
199  adapter = requests.adapters.HTTPAdapter(
200  pool_connections=max_connections, pool_maxsize=max_connections,
201  max_retries=retries, pool_block=True
202  )
203  self._session.mount("http://", adapter)
204  self._session.mount("https://", adapter)
205  # and also set the proxy settings
206  if "BELLE2_CONDB_PROXY" in os.environ:
207  self._session.proxies = {
208  "http": os.environ.get("BELLE2_CONDB_PROXY"),
209  "https": os.environ.get("BELLE2_CONDB_PROXY"),
210  }
211  # test the given url or try the known defaults
212  base_url_list = ConditionsDB.get_base_urls(base_url)
213 
214  for url in base_url_list:
215 
216  self._base_url = url.rstrip("/") + "/"
217  try:
218  req = self._session.request("HEAD", self._base_url + "v2/globalTags")
219  req.raise_for_status()
220  except requests.RequestException as e:
221  B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
222  else:
223  break
224  else:
225  B2FATAL("No working database servers configured, giving up")
226 
227  # We have a working server so change the api to return json instead of
228  # xml, much easier in python, also request non-cached replies. We do
229  # this now because for the server check above we're fine with cached
230  # results.
231  self._session.headers.update({"Accept": "application/json", "Cache-Control": "no-cache"})
232 
233  def set_authentication(self, user, password, basic=True):
234  """
235  Set authentication credentials when talking to the database
236 
237  Args:
238  user (str): username
239  password (str): password
240  basic (bool): if True us HTTP Basic authentication, otherwise HTTP Digest
241  """
242  authtype = HTTPBasicAuth if basic else HTTPDigestAuth
243  self._session.auth = authtype(user, password)
244 
245  def request(self, method, url, message=None, *args, **argk):
246  """
247  Request function, similar to requests.request but adding the base_url
248 
249  Args:
250  method (str): GET, POST, etc.
251  url (str): url for the request, base_url will be prepended
252  message (str): message to show when starting the request and if it fails
253 
254  All other arguments will be forwarded to requests.request.
255  """
256  if message is not None:
257  B2INFO(message)
258 
259  try:
260  req = self._session.request(method, self._base_url + "v2/" + url.lstrip("/"), *args, **argk)
261  except requests.exceptions.ConnectionError as e:
262  B2FATAL("Could not access '" + self._base_url + url.lstrip("/") + "': " + str(e))
263 
264  if req.status_code >= 300:
265  # Apparently something is not good. Let's try to decode the json
266  # reply containing reason and message
267  try:
268  response = req.json()
269  message = response.get("message", "")
270  colon = ": " if message.strip() else ""
271  error = "Request {method} {url} returned {code} {reason}{colon}{message}".format(
272  method=method, url=url,
273  code=response["code"],
274  reason=response["reason"],
275  message=message,
276  colon=colon,
277  )
278  except json.JSONDecodeError:
279  # seems the reply was not even json
280  error = "Request {method} {url} returned non JSON response {code}: {content}".format(
281  method=method, url=url,
282  code=req.status_code,
283  content=req.content
284  )
285 
286  if message is not None:
287  raise ConditionsDB.RequestError("{} failed: {}".format(message, error))
288  else:
289  raise ConditionsDB.RequestError(error)
290 
291  if method != "HEAD" and req.status_code != requests.codes.no_content:
292  try:
293  req.json()
294  except json.JSONDecodeError as e:
295  B2INFO("Invalid response: {}".format(req.content))
296  raise ConditionsDB.RequestError("{method} {url} returned invalid JSON response {}"
297  .format(e, method=method, url=url))
298  return req
299 
300  def get_globalTags(self):
301  """Get a list of all globaltags. Returns a dictionary with the globaltag
302  names and the corresponding ids in the database"""
303 
304  try:
305  req = self.request("GET", "/globalTags")
306  except ConditionsDB.RequestError as e:
307  B2ERROR("Could not get the list of globaltags: {}".format(e))
308  return None
309 
310  result = {}
311  for tag in req.json():
312  result[tag["name"]] = tag
313 
314  return result
315 
316  def has_globalTag(self, name):
317  """Check whether the globaltag with the given name exists."""
318 
319  try:
320  self.request("GET", "/globalTag/{globalTagName}".format(globalTagName=encode_name(name)))
322  return False
323 
324  return True
325 
326  def get_globalTagInfo(self, name):
327  """Get the id of the globaltag with the given name. Returns either the
328  id or None if the tag was not found"""
329 
330  try:
331  req = self.request("GET", "/globalTag/{globalTagName}".format(globalTagName=encode_name(name)))
332  except ConditionsDB.RequestError as e:
333  B2ERROR("Cannot find globaltag '{}': {}".format(name, e))
334  return None
335 
336  return req.json()
337 
338  def get_globalTagType(self, name):
339  """
340  Get the dictionary describing the given globaltag type (currently
341  one of DEV or RELEASE). Returns None if tag type was not found.
342  """
343  try:
344  req = self.request("GET", "/globalTagType")
345  except ConditionsDB.RequestError as e:
346  B2ERROR("Could not get list of valid globaltag types: {}".format(e))
347  return None
348 
349  types = {e["name"]: e for e in req.json()}
350 
351  if name in types:
352  return types[name]
353 
354  B2ERROR("Unknown globaltag type: '{}', please use one of {}".format(name, ", ".join(types)))
355  return None
356 
357  def create_globalTag(self, name, description, user):
358  """
359  Create a new globaltag
360  """
361  info = {"name": name, "description": description, "modifiedBy": user, "isDefault": False}
362  try:
363  req = self.request("POST", f"/globalTag/DEV", f"Creating globaltag {name}", json=info)
364  except ConditionsDB.RequestError as e:
365  B2ERROR(f"Could not create globaltag {name}: {e}")
366  return None
367 
368  return req.json()
369 
370  def get_all_iovs(self, globalTag, exp=None, run=None, message=None):
371  """
372  Return list of all payloads in the given globaltag where each element is
373  a `PayloadInformation` instance
374 
375  Parameters:
376  gobalTag (str): name of the globaltag
377  exp (int): if given limit the list of payloads to the ones valid for
378  the given exp,run combination
379  run (int): if given limit the list of payloads to the ones valid for
380  the given exp,run combination
381  message (str): additional message to show when downloading the
382  payload information. Will be directly appended to
383  "Obtaining lists of iovs for globaltag {globalTag}"
384 
385  Warning:
386  Both, exp and run, need to be given at the same time. Just supplying
387  an experiment or a run number will not work
388  """
389  globalTag = encode_name(globalTag)
390  if message is None:
391  message = ""
392  if exp is not None:
393  msg = f"Obtaining list of iovs for globaltag {globalTag}, exp={exp}, run={run}{message}"
394  req = self.request("GET", "/iovPayloads", msg, params={'gtName': globalTag, 'expNumber': exp, 'runNumber': run})
395  else:
396  msg = f"Obtaining list of iovs for globaltag {globalTag}{message}"
397  req = self.request("GET", f"/globalTag/{globalTag}/globalTagPayloads", msg)
398  all_iovs = []
399  for item in req.json():
400  payload = item["payload" if 'payload' in item else "payloadId"]
401  if "payloadIov" in item:
402  iovs = [item['payloadIov']]
403  else:
404  iovs = item['payloadIovs']
405 
406  for iov in iovs:
407  all_iovs.append(PayloadInformation.from_json(payload, iov))
408 
409  all_iovs.sort()
410  return all_iovs
411 
412  def get_payloads(self, global_tag=None):
413  """
414  Get a list of all defined payloads (for the given global_tag or by default for all).
415  Returns a dictionary which maps (module, checksum) to the payload id.
416  """
417 
418  try:
419  if global_tag:
420  req = self.request("GET", "/globalTag/{global_tag}/payloads"
421  .format(global_tag=encode_name(global_tag)))
422  else:
423  req = self.request("GET", "/payloads")
424  except ConditionsDB.RequestError as e:
425  B2ERROR("Cannot get list of payloads: {}".format(e))
426  return {}
427 
428  result = {}
429  for payload in req.json():
430  module = payload["basf2Module"]["name"]
431  checksum = payload["checksum"]
432  result[(module, checksum)] = payload["payloadId"]
433 
434  return result
435 
436  def check_payloads(self, payloads, information="payloadId"):
437  """
438  Check for the existence of payloads in the database.
439 
440  Arguments:
441  payloads (list((str,str))): A list of payloads to check for. Each
442  payload needs to be a tuple of the name of the payload and the
443  md5 checksum of the payload file.
444  information (str): The information to be extracted from the
445  payload dictionary
446 
447  Returns:
448  A dictionary with the payload identifiers (name, checksum) as keys
449  and the requested information as values for all payloads which are already
450  present in the database.
451  """
452 
453  search_query = [{"name": e[0], "checksum": e[1]} for e in payloads]
454  try:
455  req = self.request("POST", "/checkPayloads", json=search_query)
456  except ConditionsDB.RequestError as e:
457  B2ERROR("Cannot check for existing payloads: {}".format(e))
458  return {}
459 
460  result = {}
461  for payload in req.json():
462  module = payload["basf2Module"]["name"]
463  checksum = payload["checksum"]
464  result[(module, checksum)] = payload[information]
465 
466  return result
467 
468  def get_revisions(self, entries):
469  """
470  Get the revision numbers of payloads in the database.
471 
472  Arguments:
473  entries (list): A list of payload entries.
474  Each entry must have the attributes module and checksum.
475 
476  Returns:
477  True if successful.
478  """
479 
480  result = self.check_payloads([(entry.module, entry.checksum) for entry in entries], "revision")
481  if not result:
482  return False
483 
484  for entry in entries:
485  entry.revision = result.get((entry.module, entry.checksum), 0)
486 
487  return True
488 
489  def create_payload(self, module, filename, checksum=None):
490  """
491  Create a new payload
492 
493  Args:
494  module (str): name of the module
495  filename (str): name of the file
496  checksum (str): md5 hexdigest of the file. Will be calculated automatically if not given
497  """
498  if checksum is None:
499  checksum = file_checksum(filename)
500 
501  # this is the only complicated request as we have to provide a
502  # multipart/mixed request which is not directly provided by the request
503  # library.
504  files = [
505  (filename, open(filename, "rb").read(), "application/x-root"),
506  ("json", json.dumps({"checksum": checksum, "isDefault": False}), "application/json"),
507  ]
508  # ok we have the two "files" we want to send, create multipart/mixed
509  # body
510  fields = []
511  for name, contents, mimetype in files:
512  rf = RequestField(name=name, data=contents)
513  rf.make_multipart(content_type=mimetype)
514  fields.append(rf)
515 
516  post_body, content_type = encode_multipart_formdata(fields)
517  content_type = ''.join(('multipart/mixed',) + content_type.partition(';')[1:])
518  headers = {'Content-Type': content_type}
519 
520  # now make the request. Note to self: if multipart/form-data would be
521  # accepted this would be so much nicer here. but it works.
522  try:
523  req = self.request("POST", "/package/dbstore/module/{moduleName}/payload"
524  .format(moduleName=encode_name(module)),
525  data=post_body, headers=headers)
526  except ConditionsDB.RequestError as e:
527  B2ERROR("Could not create Payload: {}".format(e))
528  return None
529 
530  return req.json()["payloadId"]
531 
532  def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun):
533  """
534  Create an iov.
535 
536  Args:
537  globalTagId (int): id of the globaltag, obtain with get_globalTagId()
538  payloadId (int): id of the payload, obtain from create_payload() or get_payloads()
539  firstExp (int): first experiment for which this iov is valid
540  firstRun (int): first run for which this iov is valid
541  finalExp (int): final experiment for which this iov is valid
542  finalRun (int): final run for which this iov is valid
543 
544  Returns:
545  payloadIovId of the created iov, None if creation was not successful
546  """
547  try:
548  # try to convert all arguments except self to integers to make sure they are
549  # valid.
550  local_variables = locals()
551  variables = {e: int(local_variables[e]) for e in
552  ["globalTagId", "payloadId", "firstExp", "firstRun", "finalExp", "finalRun"]}
553  except ValueError:
554  B2ERROR("create_iov: All parameters need to be integers")
555  return None
556 
557  # try to create the iov
558  try:
559  req = self.request("POST", "/globalTagPayload/{globalTagId},{payloadId}"
560  "/payloadIov/{firstExp},{firstRun},{finalExp},{finalRun}".format(**variables))
561  except ConditionsDB.RequestError as e:
562  B2ERROR("Could not create IOV: {}".format(e))
563  return None
564 
565  return req.json()["payloadIovId"]
566 
567  def get_iovs(self, globalTagName, payloadName=None):
568  """Return existing iovs for a given tag name. It returns a dictionary
569  which maps (payloadId, first runId, final runId) to iovId
570 
571  Parameters:
572  globalTagName(str): Global tag name.
573  payloadName(str): Payload name (if None, selection by name is
574  not performed.
575  """
576 
577  try:
578  req = self.request("GET", "/globalTag/{globalTagName}/globalTagPayloads"
579  .format(globalTagName=encode_name(globalTagName)))
580  except ConditionsDB.RequestError as e:
581  # there could be just no iovs so no error
582  return {}
583 
584  result = {}
585  for payload in req.json():
586  payloadId = payload["payloadId"]["payloadId"]
587  if payloadName is not None:
588  if payload["payloadId"]["basf2Module"]["name"] != payloadName:
589  continue
590  for iov in payload["payloadIovs"]:
591  iovId = iov["payloadIovId"]
592  firstExp, firstRun = iov["expStart"], iov["runStart"]
593  finalExp, finalRun = iov["expEnd"], iov["runEnd"]
594  result[(payloadId, firstExp, firstRun, finalExp, finalRun)] = iovId
595 
596  return result
597 
598  def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None):
599  """
600  Upload a testing payload storage to the conditions database.
601 
602  Parameters:
603  filename (str): filename of the testing payload storage file that should be uploaded
604  global_tage (str): name of the globaltag to which the data should be uploaded
605  normalize (bool/str): if True the payload root files will be normalized to have the same checksum for the same content,
606  if normalize is a string in addition the file name in the root file metadata will be set to it
607  ignore_existing (bool): if True do not upload payloads that already exist
608  nprocess (int): maximal number of parallel uploads
609  uploaded_entries (list): the list of successfully uploaded entries
610 
611  Returns:
612  True if the upload was successful
613  """
614 
615  # first create a list of payloads
616  from conditions_db.testing_payloads import parse_testing_payloads_file
617  B2INFO(f"Reading payload list from {filename}")
618  entries = parse_testing_payloads_file(filename)
619  if entries is None:
620  B2ERROR(f"Problems with testing payload storage file {filename}, exiting")
621  return False
622 
623  if not entries:
624  B2INFO(f"No payloads found in {filename}, exiting")
625  return True
626 
627  B2INFO(f"Found {len(entries)} iovs to upload")
628 
629  # time to get the id for the globaltag
630  tagId = self.get_globalTagInfo(global_tag)
631  if tagId is None:
632  return False
633  tagId = tagId["globalTagId"]
634 
635  # now we could have more than one payload with the same iov so let's go over
636  # it again and remove duplicates but keep the last one for each
637  entries = sorted(set(reversed(entries)))
638 
639  if normalize:
640  name = normalize if normalize is not True else None
641  for e in entries:
642  e.normalize(name=name)
643 
644  # so let's have a list of all payloads (name, checksum) as some payloads
645  # might have multiple iovs. Each payload gets a list of all of those
646  payloads = defaultdict(list)
647  for e in entries:
648  payloads[(e.module, e.checksum)].append(e)
649 
650  existing_payloads = {}
651  existing_iovs = {}
652 
653  def upload_payload(item):
654  """Upload a payload file if necessary but first check list of existing payloads"""
655  key, entries = item
656  if key in existing_payloads:
657  B2INFO(f"{key[0]} (md5:{key[1]}) already existing in database, skipping.")
658  payload_id = existing_payloads[key]
659  else:
660  entry = entries[0]
661  payload_id = self.create_payload(entry.module, entry.filename, entry.checksum)
662  if payload_id is None:
663  return False
664 
665  B2INFO(f"Created new payload {payload_id} for {entry.module} (md5:{entry.checksum})")
666 
667  for entry in entries:
668  entry.payload = payload_id
669 
670  return True
671 
672  def create_iov(entry):
673  """Create an iov if necessary but first check the list of existing iovs"""
674  if entry.payload is None:
675  return None
676 
677  iov_key = (entry.payload,) + entry.iov_tuple
678  if iov_key in existing_iovs:
679  entry.iov = existing_iovs[iov_key]
680  B2INFO(f"IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum}) already existing in database, skipping.")
681  else:
682  entry.payloadIovId = self.create_iov(tagId, entry.payload, *entry.iov_tuple)
683  if entry.payloadIovId is None:
684  return False
685 
686  B2INFO(f"Created IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum})")
687 
688  return entry
689 
690  # multithreading for the win ...
691  with ThreadPoolExecutor(max_workers=nprocess) as pool:
692  # if we want to check for existing payloads/iovs we schedule the download of
693  # the full payload list. And write a message as each completes
694  if not ignore_existing:
695  B2INFO("Downloading information about existing payloads and iovs...")
696  futures = []
697  existing_iovs = {}
698  existing_payloads = {}
699 
700  def create_future(iter, func, callback=None):
701  fn = pool.submit(iter, func)
702  if callback is not None:
703  fn.add_done_callback(callback)
704  futures.append(fn)
705 
706  def update_iovs(iovs):
707  existing_iovs.update(iovs.result())
708  B2INFO(f"Found {len(existing_iovs)} existing iovs in {global_tag}")
709 
710  def update_payloads(payloads):
711  existing_payloads.update(payloads.result())
712  B2INFO(f"Found {len(existing_payloads)} existing payloads")
713 
714  create_future(self.get_iovs, global_tag, update_iovs)
715  # checking existing payloads should not be done with too many at once
716  for chunk in chunks(payloads.keys(), 1000):
717  create_future(self.check_payloads, chunk, update_payloads)
718 
719  futures_wait(futures)
720 
721  # upload payloads
722  failed_payloads = sum(0 if result else 1 for result in pool.map(upload_payload, payloads.items()))
723  if failed_payloads > 0:
724  B2ERROR(f"{failed_payloads} payloads could not be uploaded")
725 
726  # create IoVs
727  failed_iovs = 0
728  for entry in pool.map(create_iov, entries):
729  if entry:
730  if uploaded_entries is not None:
731  uploaded_entries.append(entry)
732  else:
733  failed_iovs += 1
734  if failed_iovs > 0:
735  B2ERROR(f"{failed_iovs} IoVs could not be created")
736 
737  # update revision numbers
738  if uploaded_entries is not None:
739  self.get_revisions(uploaded_entries)
740 
741  return failed_payloads + failed_iovs == 0
742 
743  def staging_request(self, filename, normalize, data, password):
744  """
745  Upload a testing payload storage to a staging globaltag and create or update a jira issue
746 
747  Parameters:
748  filename (str): filename of the testing payload storage file that should be uploaded
749  normalize (bool/str): if True the payload root files will be
750  normalized to have the same checksum for the same content, if
751  normalize is a string in addition the file name in the root file
752  metadata will be set to it
753  data (dict): a dictionary with the information provided by the user:
754 
755  * task: category of globaltag, either master, online, prompt, data, mc, or analysis
756  * tag: the globaltage name
757  * request: type of request, either Update, New, or Modification. The latter two imply task == master because
758  if new payload classes are introduced or payload classes are modified then they will first be included in
759  the master globaltag. Here a synchronization of code and payload changes has to be managed.
760  If new or modified payload classes should be included in other globaltags they must already be in a release.
761  * pull-request: number of the pull request containing new or modified payload classes,
762  only for request == New or Modified
763  * backward-compatibility: description of what happens if the old payload is encountered by the updated code,
764  only for request == Modified
765  * forward-compatibility: description of what happens if a new payload is encountered by the existing code,
766  only for request == Modified
767  * release: the required release version
768  * reason: the reason for the request
769  * description: a detailed description for the globaltag manager
770  * issue: identifier of an existing jira issue (optional)
771  * user: name of the user
772  * time: time stamp of the request
773 
774  password: the password for access to jira or the access token and secret for oauth access
775 
776  Returns:
777  True if the upload and jira issue creation/upload was successful
778  """
779 
780  # determine the staging globaltag name
781  data['tag'] = upload_global_tag(data['task'])
782  if data['tag'] is None:
783  data['tag'] = f"staging_{data['task']}_{data['user']}_{data['time']}"
784 
785  # create the staging globaltag if it does not exists yet
786  if not self.has_globalTag(data['tag']):
787  if not self.create_globalTag(data['tag'], data['reason'], data['user']):
788  return False
789 
790  # upload the payloads
791  B2INFO(f"Uploading testing database {filename} to globaltag {data['tag']}")
792  entries = []
793  if not self.upload(filename, data['tag'], normalize, uploaded_entries=entries):
794  return False
795 
796  # get the dictionary for the jira issue creation/update
797  if data['issue']:
798  issue = data['issue']
799  else:
800  issue = jira_global_tag_v2(data['task'])
801  if issue is None:
802  issue = {"components": [{"name": "globaltag"}]}
803 
804  # create jira issue text from provided information
805  if type(issue) is tuple:
806  description = issue[1].format(**data)
807  issue = issue[0]
808  else:
809  description = f"""
810 |*Upload globaltag* | {data['tag']} |
811 |*Request reason* | {data['reason']} |
812 |*Required release* | {data['release']} |
813 |*Type of request* | {data['request']} |
814 """
815  if 'pull-request' in data.keys():
816  description += f"|*Pull request* | \\#{data['pull-request']} |\n"
817  if 'backward-compatibility' in data.keys():
818  description += f"|*Backward compatibility* | \\#{data['backward-compatibility']} |\n"
819  if 'forward-compatibility' in data.keys():
820  description += f"|*Forward compatibility* | \\#{data['forward-compatibility']} |\n"
821  description += '|*Details* |' + ''.join(data['details']) + ' |\n'
822  if data['task'] == 'online':
823  description += f'|*Impact on data taking*|' + ''.join(data['data_taking']) + ' |\n'
824 
825  # add information about uploaded payloads/IoVs
826  description += '\nPayloads\n||Name||Revision||IoV||\n'
827  for entry in entries:
828  description += f"|{entry.module} | {entry.revision} | ({entry.iov_str()}) |\n"
829 
830  # create a new issue
831  if type(issue) is dict:
832  issue["description"] = description
833  if "summary" in issue.keys():
834  issue["summary"] = issue["summary"].format(**data)
835  else:
836  issue["summary"] = f"Globaltag request for {data['task']} by {data['user']} at {data['time']}"
837  if "project" not in issue.keys():
838  issue["project"] = {"key": "BII"}
839  if "issuetype" not in issue.keys():
840  issue["issuetype"] = {"name": "Task"}
841 
842  B2INFO(f"Creating jira issue for {data['task']} globaltag request")
843  if isinstance(password, str):
844  response = requests.post('https://agira.desy.de/rest/api/latest/issue', auth=(data['user'], password),
845  json={'fields': issue})
846  else:
847  fields = {'issue': json.dumps(issue)}
848  if 'user' in data.keys():
849  fields['user'] = data['user']
850  if password:
851  fields['token'] = password[0]
852  fields['secret'] = password[1]
853  response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
854  if response.status_code in range(200, 210):
855  B2INFO(f"Issue successfully created: https://agira.desy.de/browse/{response.json()['key']}")
856  else:
857  B2ERROR('The creation of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
858  return False
859 
860  # comment on an existing issue
861  else:
862  # Let's make sure all assignees of new issues are added as watchers
863  # in that case, otherwise they might never find out
864  new_issue_config = jira_global_tag_v2(data['task'])
865  if isinstance(new_issue_config, dict) and "assignee" in new_issue_config:
866  user = new_issue_config['assignee'].get('name', None)
867  if user is not None and isinstance(password, str):
868  response = requests.post(f'https://agira.desy.de/rest/api/latest/issue/{issue}/watchers',
869  auth=(data['user'], password), json=user)
870  if response.status_code in range(200, 210):
871  B2INFO(f"Added {user} as watcher to {issue}")
872  else:
873  B2WARNING(f"Could not add {user} as watcher to {issue}: {response.status_code}")
874 
875  B2INFO(f"Commenting on jira issue {issue} for {data['task']} globaltag request")
876  if isinstance(password, str):
877  response = requests.post('https://agira.desy.de/rest/api/latest/issue/%s/comment' % issue,
878  auth=(data['user'], password), json={'body': description})
879  else:
880  fields = {'id': issue, 'user': user, 'comment': description}
881  if password:
882  fields['token'] = password[0]
883  fields['secret'] = password[1]
884  response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
885  if response.status_code in range(200, 210):
886  B2INFO(f"Issue successfully updated: https://agira.desy.de/browse/{issue}")
887  else:
888  B2ERROR('The commenting of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
889  return False
890 
891  return True
892 
893 
894 def require_database_for_test(timeout=60, base_url=None):
895  """Make sure that the database is available and skip the test if not.
896 
897  This function should be called in test scripts if they are expected to fail
898  if the database is down. It either returns when the database is ok or it
899  will signal test_basf2 that the test should be skipped and exit
900  """
901  import sys
902  if os.environ.get("BELLE2_CONDB_GLOBALTAG", None) == "":
903  raise Exception("Access to the Database is disabled")
904  base_url_list = ConditionsDB.get_base_urls(base_url)
905  for url in base_url_list:
906  try:
907  req = requests.request("HEAD", url.rstrip('/') + "/v2/globalTags")
908  req.raise_for_status()
909  except requests.RequestException as e:
910  B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
911  else:
912  break
913  else:
914  print("TEST SKIPPED: No working database servers configured, giving up", file=sys.stderr)
915  sys.exit(1)
916 
917 
918 def enable_debugging():
919  """Enable verbose output of python-requests to be able to debug http connections"""
920  # These two lines enable debugging at httplib level (requests->urllib3->http.client)
921  # You will see the REQUEST, including HEADERS and DATA, and RESPONSE with HEADERS but without DATA.
922  # The only thing missing will be the response.body which is not logged.
923  import http.client as http_client
924  import logging
925  http_client.HTTPConnection.debuglevel = 1
926  # You must initialize logging, otherwise you'll not see debug output.
927  logging.basicConfig()
928  logging.getLogger().setLevel(logging.DEBUG)
929  requests_log = logging.getLogger("requests.packages.urllib3")
930  requests_log.setLevel(logging.DEBUG)
931  requests_log.propagate = True
conditions_db.PayloadInformation.payload_id
payload_id
payload id in CDB, not used for comparisons
Definition: __init__.py:88
conditions_db.PayloadInformation.iov_id
iov_id
iov id in CDB, not used for comparisons
Definition: __init__.py:90
conditions_db.ConditionsDB._base_url
_base_url
base url to be prepended to all requests
Definition: __init__.py:216
conditions_db.PayloadInformation.readable_iov
def readable_iov(self)
Definition: __init__.py:113
conditions_db.PayloadInformation.__lt__
def __lt__(self, other)
Definition: __init__.py:109
conditions_db.ConditionsDB.get_payloads
def get_payloads(self, global_tag=None)
Definition: __init__.py:412
conditions_db.PayloadInformation.iov
iov
interval of validity
Definition: __init__.py:84
conditions_db.ConditionsDB.get_globalTagInfo
def get_globalTagInfo(self, name)
Definition: __init__.py:326
conditions_db.ConditionsDB.create_iov
def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun)
Definition: __init__.py:532
conditions_db.ConditionsDB.check_payloads
def check_payloads(self, payloads, information="payloadId")
Definition: __init__.py:436
conditions_db.PayloadInformation.__hash__
def __hash__(self)
Definition: __init__.py:101
conditions_db.ConditionsDB.create_payload
def create_payload(self, module, filename, checksum=None)
Definition: __init__.py:489
conditions_db.ConditionsDB.get_globalTagType
def get_globalTagType(self, name)
Definition: __init__.py:338
conditions_db.PayloadInformation.payload_url
payload_url
payload url
Definition: __init__.py:94
conditions_db.ConditionsDB.get_all_iovs
def get_all_iovs(self, globalTag, exp=None, run=None, message=None)
Definition: __init__.py:370
conditions_db.ConditionsDB.get_revisions
def get_revisions(self, entries)
Definition: __init__.py:468
conditions_db.PayloadInformation.name
name
name of the payload
Definition: __init__.py:80
conditions_db.ConditionsDB.__init__
def __init__(self, base_url=None, max_connections=10, retries=3)
Definition: __init__.py:186
conditions_db.ConditionsDB.request
def request(self, method, url, message=None, *args, **argk)
Definition: __init__.py:245
conditions_db.ConditionsDB.RequestError
Definition: __init__.py:150
conditions_db.ConditionsDB._session
_session
session object to get keep-alive support and connection pooling
Definition: __init__.py:197
conditions_db.PayloadInformation.revision
revision
revision, not used for comparisons
Definition: __init__.py:86
conditions_db.PayloadInformation.checksum
checksum
checksum of the payload
Definition: __init__.py:82
conditions_db.PayloadInformation.url
def url(self)
Definition: __init__.py:97
conditions_db.ConditionsDB.get_iovs
def get_iovs(self, globalTagName, payloadName=None)
Definition: __init__.py:567
conditions_db.ConditionsDB.get_base_urls
def get_base_urls(given_url)
Definition: __init__.py:155
conditions_db.ConditionsDB.has_globalTag
def has_globalTag(self, name)
Definition: __init__.py:316
conditions_db.ConditionsDB
Definition: __init__.py:144
conditions_db.ConditionsDB.create_globalTag
def create_globalTag(self, name, description, user)
Definition: __init__.py:357
conditions_db.ConditionsDB.set_authentication
def set_authentication(self, user, password, basic=True)
Definition: __init__.py:233
conditions_db.PayloadInformation.base_url
base_url
base url
Definition: __init__.py:92
conditions_db.testing_payloads
Definition: testing_payloads.py:1
conditions_db.ConditionsDB.upload
def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None)
Definition: __init__.py:598
conditions_db.PayloadInformation.__eq__
def __eq__(self, other)
Definition: __init__.py:105
conditions_db.ConditionsDB.staging_request
def staging_request(self, filename, normalize, data, password)
Definition: __init__.py:743
conditions_db.PayloadInformation.__init__
def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None)
Definition: __init__.py:75
conditions_db.ConditionsDB.get_globalTags
def get_globalTags(self)
Definition: __init__.py:300
conditions_db.PayloadInformation.from_json
def from_json(cls, payload, iov=None)
Definition: __init__.py:54
conditions_db.PayloadInformation
Definition: __init__.py:49