Belle II Software  release-06-01-15
__init__.py
1 #!/usr/bin/env python3
2 
3 
10 
11 """
12 conditions_db
13 -------------
14 
15 Python interface to the ConditionsDB
16 """
17 
18 import os
19 from basf2 import B2FATAL, B2ERROR, B2INFO, B2WARNING
20 import requests
21 from requests.auth import HTTPBasicAuth, HTTPDigestAuth
22 from requests.packages.urllib3.fields import RequestField
23 from requests.packages.urllib3.filepost import encode_multipart_formdata
24 import json
25 import urllib
26 from versioning import upload_global_tag, jira_global_tag_v2
27 from collections import defaultdict
28 from concurrent.futures import ThreadPoolExecutor, wait as futures_wait
29 import hashlib
30 import itertools
31 
32 
33 def encode_name(name):
34  """Escape name to be used in an url"""
35  return urllib.parse.quote(name, safe="")
36 
37 
38 def file_checksum(filename):
39  """Calculate md5 hash of file"""
40  md5hash = hashlib.md5()
41  with open(filename, "rb") as data:
42  md5hash.update(data.read())
43  return md5hash.hexdigest()
44 
45 
46 def chunks(container, chunk_size):
47  """Cut a container in chunks of max. chunk_size"""
48  it = iter(container)
49  while True:
50  chunk = tuple(itertools.islice(it, chunk_size))
51  if not chunk:
52  return
53  yield chunk
54 
55 
57  """Small container class to help compare payload information for efficient
58  comparison between globaltags"""
59 
60  @classmethod
61  def from_json(cls, payload, iov=None):
62  """Set all internal members from the json information of the payload and the iov.
63 
64  Arguments:
65  payload (dict): json information of the payload as returned by REST api
66  iov (dict): json information of the iov as returned by REST api
67  """
68  if iov is None:
69  iov = {"payloadIovId": None, "expStart": None, "runStart": None, "expEnd": None, "runEnd": None}
70 
71  return cls(
72  payload['payloadId'],
73  payload['basf2Module']['name'],
74  payload['revision'],
75  payload['checksum'],
76  payload['payloadUrl'],
77  payload['baseUrl'],
78  iov['payloadIovId'],
79  (iov["expStart"], iov["runStart"], iov["expEnd"], iov["runEnd"]),
80  )
81 
82  def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None):
83  """
84  Create a new object from the given information
85  """
86 
87  self.namename = name
88 
89  self.checksumchecksum = checksum
90 
91  self.ioviov = iov
92 
93  self.revisionrevision = revision
94 
95  self.payload_idpayload_id = payload_id
96 
97  self.iov_idiov_id = iov_id
98 
99  self.base_urlbase_url = base_url
100 
101  self.payload_urlpayload_url = payload_url
102 
103  @property
104  def url(self):
105  """Return the full url to the payload on the server"""
106  return urllib.parse.urljoin(self.base_urlbase_url + '/', self.payload_urlpayload_url)
107 
108  def __hash__(self):
109  """Make object hashable"""
110  return hash((self.namename, self.checksumchecksum, self.ioviov))
111 
112  def __eq__(self, other):
113  """Check if two payloads are equal"""
114  return (self.namename, self.checksumchecksum, self.ioviov) == (other.name, other.checksum, other.iov)
115 
116  def __lt__(self, other):
117  """Sort payloads by name, iov, revision"""
118  return (self.namename.lower(), self.ioviov, self.revisionrevision) < (other.name.lower(), other.iov, other.revision)
119 
120  def readable_iov(self):
121  """return a human readable name for the IoV"""
122  if self.ioviov is None:
123  return "none"
124 
125  if self.ioviov == (0, 0, -1, -1):
126  return "always"
127 
128  e1, r1, e2, r2 = self.ioviov
129  if e1 == e2:
130  if r1 == 0 and r2 == -1:
131  return f"exp {e1}"
132  elif r2 == -1:
133  return f"exp {e1}, runs {r1}+"
134  elif r1 == r2:
135  return f"exp {e1}, run {r1}"
136  else:
137  return f"exp {e1}, runs {r1} - {r2}"
138  else:
139  if e2 == -1 and r1 == 0:
140  return f"exp {e1} - forever"
141  elif e2 == -1:
142  return f"exp {e1}, run {r1} - forever"
143  elif r1 == 0 and r2 == -1:
144  return f"exp {e1}-{e2}, all runs"
145  elif r2 == -1:
146  return f"exp {e1}, run {r1} - exp {e2}, all runs"
147  else:
148  return f"exp {e1}, run {r1} - exp {e2}, run {r2}"
149 
150 
152  """Class to interface conditions db REST interface"""
153 
154 
155  BASE_URLS = ["http://belle2db.sdcc.bnl.gov/b2s/rest/"]
156 
157  class RequestError(RuntimeError):
158  """Class to be thrown by request() if there is any error"""
159  pass
160 
161  @staticmethod
162  def get_base_urls(given_url):
163  """Resolve the list of server urls. If a url is given just return it.
164  Otherwise return servers listed in BELLE2_CONDB_SERVERLIST or the
165  builtin defaults
166 
167  Arguments:
168  given_url (str): Explicit base_url. If this is not None it will be
169  returned as is in a list of length 1
170 
171  Returns:
172  a list of urls to try for database connectivity
173  """
174 
175  base_url_list = ConditionsDB.BASE_URLS[:]
176  base_url_env = os.environ.get("BELLE2_CONDB_SERVERLIST", None)
177  if given_url is not None:
178  base_url_list = [given_url]
179  elif base_url_env is not None:
180  base_url_list = base_url_env.split()
181  B2INFO("Getting Conditions Database servers from Environment:")
182  for i, url in enumerate(base_url_list, 1):
183  B2INFO(f" {i}. {url}")
184  # try to escalate to https for all given urls
185  full_list = []
186  for url in base_url_list:
187  if url.startswith("http://"):
188  full_list.append("https" + url[4:])
189  # but keep the http in case of connection problems
190  full_list.append(url)
191  return full_list
192 
193  def __init__(self, base_url=None, max_connections=10, retries=3):
194  """
195  Create a new instance of the interface
196 
197  Args:
198  base_url (string): base url of the rest interface
199  max_connections (int): number of connections to keep open, mostly useful for threaded applications
200  retries (int): number of retries in case of connection problems
201  """
202 
203 
204  self._session_session = requests.Session()
205  # and set the connection options we want to have
206  adapter = requests.adapters.HTTPAdapter(
207  pool_connections=max_connections, pool_maxsize=max_connections,
208  max_retries=retries, pool_block=True
209  )
210  self._session_session.mount("http://", adapter)
211  self._session_session.mount("https://", adapter)
212  # and also set the proxy settings
213  if "BELLE2_CONDB_PROXY" in os.environ:
214  self._session_session.proxies = {
215  "http": os.environ.get("BELLE2_CONDB_PROXY"),
216  "https": os.environ.get("BELLE2_CONDB_PROXY"),
217  }
218  # test the given url or try the known defaults
219  base_url_list = ConditionsDB.get_base_urls(base_url)
220 
221  for url in base_url_list:
222 
223  self._base_url_base_url = url.rstrip("/") + "/"
224  try:
225  req = self._session_session.request("HEAD", self._base_url_base_url + "v2/globalTags")
226  req.raise_for_status()
227  except requests.RequestException as e:
228  B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
229  else:
230  break
231  else:
232  B2FATAL("No working database servers configured, giving up")
233 
234  # We have a working server so change the api to return json instead of
235  # xml, much easier in python, also request non-cached replies. We do
236  # this now because for the server check above we're fine with cached
237  # results.
238  self._session_session.headers.update({"Accept": "application/json", "Cache-Control": "no-cache"})
239 
240  def set_authentication(self, user, password, basic=True):
241  """
242  Set authentication credentials when talking to the database
243 
244  Args:
245  user (str): username
246  password (str): password
247  basic (bool): if True us HTTP Basic authentication, otherwise HTTP Digest
248  """
249  authtype = HTTPBasicAuth if basic else HTTPDigestAuth
250  self._session_session.auth = authtype(user, password)
251 
252  def request(self, method, url, message=None, *args, **argk):
253  """
254  Request function, similar to requests.request but adding the base_url
255 
256  Args:
257  method (str): GET, POST, etc.
258  url (str): url for the request, base_url will be prepended
259  message (str): message to show when starting the request and if it fails
260 
261  All other arguments will be forwarded to requests.request.
262  """
263  if message is not None:
264  B2INFO(message)
265 
266  try:
267  req = self._session_session.request(method, self._base_url_base_url + "v2/" + url.lstrip("/"), *args, **argk)
268  except requests.exceptions.ConnectionError as e:
269  B2FATAL("Could not access '" + self._base_url_base_url + url.lstrip("/") + "': " + str(e))
270 
271  if req.status_code >= 300:
272  # Apparently something is not good. Let's try to decode the json
273  # reply containing reason and message
274  try:
275  response = req.json()
276  message = response.get("message", "")
277  colon = ": " if message.strip() else ""
278  error = "Request {method} {url} returned {code} {reason}{colon}{message}".format(
279  method=method, url=url,
280  code=response["code"],
281  reason=response["reason"],
282  message=message,
283  colon=colon,
284  )
285  except json.JSONDecodeError:
286  # seems the reply was not even json
287  error = "Request {method} {url} returned non JSON response {code}: {content}".format(
288  method=method, url=url,
289  code=req.status_code,
290  content=req.content
291  )
292 
293  if message is not None:
294  raise ConditionsDB.RequestError(f"{message} failed: {error}")
295  else:
296  raise ConditionsDB.RequestError(error)
297 
298  if method != "HEAD" and req.status_code != requests.codes.no_content:
299  try:
300  req.json()
301  except json.JSONDecodeError as e:
302  B2INFO(f"Invalid response: {req.content}")
303  raise ConditionsDB.RequestError("{method} {url} returned invalid JSON response {}"
304  .format(e, method=method, url=url))
305  return req
306 
307  def get_globalTags(self):
308  """Get a list of all globaltags. Returns a dictionary with the globaltag
309  names and the corresponding ids in the database"""
310 
311  try:
312  req = self.requestrequest("GET", "/globalTags")
313  except ConditionsDB.RequestError as e:
314  B2ERROR(f"Could not get the list of globaltags: {e}")
315  return None
316 
317  result = {}
318  for tag in req.json():
319  result[tag["name"]] = tag
320 
321  return result
322 
323  def has_globalTag(self, name):
324  """Check whether the globaltag with the given name exists."""
325 
326  try:
327  self.requestrequest("GET", "/globalTag/{globalTagName}".format(globalTagName=encode_name(name)))
329  return False
330 
331  return True
332 
333  def get_globalTagInfo(self, name):
334  """Get the id of the globaltag with the given name. Returns either the
335  id or None if the tag was not found"""
336 
337  try:
338  req = self.requestrequest("GET", "/globalTag/{globalTagName}".format(globalTagName=encode_name(name)))
339  except ConditionsDB.RequestError as e:
340  B2ERROR(f"Cannot find globaltag '{name}': {e}")
341  return None
342 
343  return req.json()
344 
345  def get_globalTagType(self, name):
346  """
347  Get the dictionary describing the given globaltag type (currently
348  one of DEV or RELEASE). Returns None if tag type was not found.
349  """
350  try:
351  req = self.requestrequest("GET", "/globalTagType")
352  except ConditionsDB.RequestError as e:
353  B2ERROR(f"Could not get list of valid globaltag types: {e}")
354  return None
355 
356  types = {e["name"]: e for e in req.json()}
357 
358  if name in types:
359  return types[name]
360 
361  B2ERROR("Unknown globaltag type: '{}', please use one of {}".format(name, ", ".join(types)))
362  return None
363 
364  def create_globalTag(self, name, description, user):
365  """
366  Create a new globaltag
367  """
368  info = {"name": name, "description": description, "modifiedBy": user, "isDefault": False}
369  try:
370  req = self.requestrequest("POST", "/globalTag/DEV", f"Creating globaltag {name}", json=info)
371  except ConditionsDB.RequestError as e:
372  B2ERROR(f"Could not create globaltag {name}: {e}")
373  return None
374 
375  return req.json()
376 
377  def get_all_iovs(self, globalTag, exp=None, run=None, message=None):
378  """
379  Return list of all payloads in the given globaltag where each element is
380  a `PayloadInformation` instance
381 
382  Parameters:
383  gobalTag (str): name of the globaltag
384  exp (int): if given limit the list of payloads to the ones valid for
385  the given exp,run combination
386  run (int): if given limit the list of payloads to the ones valid for
387  the given exp,run combination
388  message (str): additional message to show when downloading the
389  payload information. Will be directly appended to
390  "Obtaining lists of iovs for globaltag {globalTag}"
391 
392  Warning:
393  Both, exp and run, need to be given at the same time. Just supplying
394  an experiment or a run number will not work
395  """
396  globalTag = encode_name(globalTag)
397  if message is None:
398  message = ""
399  if exp is not None:
400  msg = f"Obtaining list of iovs for globaltag {globalTag}, exp={exp}, run={run}{message}"
401  req = self.requestrequest("GET", "/iovPayloads", msg, params={'gtName': globalTag, 'expNumber': exp, 'runNumber': run})
402  else:
403  msg = f"Obtaining list of iovs for globaltag {globalTag}{message}"
404  req = self.requestrequest("GET", f"/globalTag/{globalTag}/globalTagPayloads", msg)
405  all_iovs = []
406  for item in req.json():
407  payload = item["payload" if 'payload' in item else "payloadId"]
408  if "payloadIov" in item:
409  iovs = [item['payloadIov']]
410  else:
411  iovs = item['payloadIovs']
412 
413  for iov in iovs:
414  all_iovs.append(PayloadInformation.from_json(payload, iov))
415 
416  all_iovs.sort()
417  return all_iovs
418 
419  def get_payloads(self, global_tag=None):
420  """
421  Get a list of all defined payloads (for the given global_tag or by default for all).
422  Returns a dictionary which maps (module, checksum) to the payload id.
423  """
424 
425  try:
426  if global_tag:
427  req = self.requestrequest("GET", "/globalTag/{global_tag}/payloads"
428  .format(global_tag=encode_name(global_tag)))
429  else:
430  req = self.requestrequest("GET", "/payloads")
431  except ConditionsDB.RequestError as e:
432  B2ERROR(f"Cannot get list of payloads: {e}")
433  return {}
434 
435  result = {}
436  for payload in req.json():
437  module = payload["basf2Module"]["name"]
438  checksum = payload["checksum"]
439  result[(module, checksum)] = payload["payloadId"]
440 
441  return result
442 
443  def check_payloads(self, payloads, information="payloadId"):
444  """
445  Check for the existence of payloads in the database.
446 
447  Arguments:
448  payloads (list((str,str))): A list of payloads to check for. Each
449  payload needs to be a tuple of the name of the payload and the
450  md5 checksum of the payload file.
451  information (str): The information to be extracted from the
452  payload dictionary
453 
454  Returns:
455  A dictionary with the payload identifiers (name, checksum) as keys
456  and the requested information as values for all payloads which are already
457  present in the database.
458  """
459 
460  search_query = [{"name": e[0], "checksum": e[1]} for e in payloads]
461  try:
462  req = self.requestrequest("POST", "/checkPayloads", json=search_query)
463  except ConditionsDB.RequestError as e:
464  B2ERROR(f"Cannot check for existing payloads: {e}")
465  return {}
466 
467  result = {}
468  for payload in req.json():
469  module = payload["basf2Module"]["name"]
470  checksum = payload["checksum"]
471  result[(module, checksum)] = payload[information]
472 
473  return result
474 
475  def get_revisions(self, entries):
476  """
477  Get the revision numbers of payloads in the database.
478 
479  Arguments:
480  entries (list): A list of payload entries.
481  Each entry must have the attributes module and checksum.
482 
483  Returns:
484  True if successful.
485  """
486 
487  result = self.check_payloadscheck_payloads([(entry.module, entry.checksum) for entry in entries], "revision")
488  if not result:
489  return False
490 
491  for entry in entries:
492  entry.revision = result.get((entry.module, entry.checksum), 0)
493 
494  return True
495 
496  def create_payload(self, module, filename, checksum=None):
497  """
498  Create a new payload
499 
500  Args:
501  module (str): name of the module
502  filename (str): name of the file
503  checksum (str): md5 hexdigest of the file. Will be calculated automatically if not given
504  """
505  if checksum is None:
506  checksum = file_checksum(filename)
507 
508  # this is the only complicated request as we have to provide a
509  # multipart/mixed request which is not directly provided by the request
510  # library.
511  files = [
512  (filename, open(filename, "rb").read(), "application/x-root"),
513  ("json", json.dumps({"checksum": checksum, "isDefault": False}), "application/json"),
514  ]
515  # ok we have the two "files" we want to send, create multipart/mixed
516  # body
517  fields = []
518  for name, contents, mimetype in files:
519  rf = RequestField(name=name, data=contents)
520  rf.make_multipart(content_type=mimetype)
521  fields.append(rf)
522 
523  post_body, content_type = encode_multipart_formdata(fields)
524  content_type = ''.join(('multipart/mixed',) + content_type.partition(';')[1:])
525  headers = {'Content-Type': content_type}
526 
527  # now make the request. Note to self: if multipart/form-data would be
528  # accepted this would be so much nicer here. but it works.
529  try:
530  req = self.requestrequest("POST", "/package/dbstore/module/{moduleName}/payload"
531  .format(moduleName=encode_name(module)),
532  data=post_body, headers=headers)
533  except ConditionsDB.RequestError as e:
534  B2ERROR(f"Could not create Payload: {e}")
535  return None
536 
537  return req.json()["payloadId"]
538 
539  def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun):
540  """
541  Create an iov.
542 
543  Args:
544  globalTagId (int): id of the globaltag, obtain with get_globalTagId()
545  payloadId (int): id of the payload, obtain from create_payload() or get_payloads()
546  firstExp (int): first experiment for which this iov is valid
547  firstRun (int): first run for which this iov is valid
548  finalExp (int): final experiment for which this iov is valid
549  finalRun (int): final run for which this iov is valid
550 
551  Returns:
552  payloadIovId of the created iov, None if creation was not successful
553  """
554  try:
555  # try to convert all arguments except self to integers to make sure they are
556  # valid.
557  local_variables = locals()
558  variables = {e: int(local_variables[e]) for e in
559  ["globalTagId", "payloadId", "firstExp", "firstRun", "finalExp", "finalRun"]}
560  except ValueError:
561  B2ERROR("create_iov: All parameters need to be integers")
562  return None
563 
564  # try to create the iov
565  try:
566  req = self.requestrequest("POST", "/globalTagPayload/{globalTagId},{payloadId}"
567  "/payloadIov/{firstExp},{firstRun},{finalExp},{finalRun}".format(**variables))
568  except ConditionsDB.RequestError as e:
569  B2ERROR(f"Could not create IOV: {e}")
570  return None
571 
572  return req.json()["payloadIovId"]
573 
574  def get_iovs(self, globalTagName, payloadName=None):
575  """Return existing iovs for a given tag name. It returns a dictionary
576  which maps (payloadId, first runId, final runId) to iovId
577 
578  Parameters:
579  globalTagName(str): Global tag name.
580  payloadName(str): Payload name (if None, selection by name is
581  not performed.
582  """
583 
584  try:
585  req = self.requestrequest("GET", "/globalTag/{globalTagName}/globalTagPayloads"
586  .format(globalTagName=encode_name(globalTagName)))
588  # there could be just no iovs so no error
589  return {}
590 
591  result = {}
592  for payload in req.json():
593  payloadId = payload["payloadId"]["payloadId"]
594  if payloadName is not None:
595  if payload["payloadId"]["basf2Module"]["name"] != payloadName:
596  continue
597  for iov in payload["payloadIovs"]:
598  iovId = iov["payloadIovId"]
599  firstExp, firstRun = iov["expStart"], iov["runStart"]
600  finalExp, finalRun = iov["expEnd"], iov["runEnd"]
601  result[(payloadId, firstExp, firstRun, finalExp, finalRun)] = iovId
602 
603  return result
604 
605  def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None):
606  """
607  Upload a testing payload storage to the conditions database.
608 
609  Parameters:
610  filename (str): filename of the testing payload storage file that should be uploaded
611  global_tage (str): name of the globaltag to which the data should be uploaded
612  normalize (bool/str): if True the payload root files will be normalized to have the same checksum for the same content,
613  if normalize is a string in addition the file name in the root file metadata will be set to it
614  ignore_existing (bool): if True do not upload payloads that already exist
615  nprocess (int): maximal number of parallel uploads
616  uploaded_entries (list): the list of successfully uploaded entries
617 
618  Returns:
619  True if the upload was successful
620  """
621 
622  # first create a list of payloads
623  from conditions_db.testing_payloads import parse_testing_payloads_file
624  B2INFO(f"Reading payload list from {filename}")
625  entries = parse_testing_payloads_file(filename)
626  if entries is None:
627  B2ERROR(f"Problems with testing payload storage file {filename}, exiting")
628  return False
629 
630  if not entries:
631  B2INFO(f"No payloads found in {filename}, exiting")
632  return True
633 
634  B2INFO(f"Found {len(entries)} iovs to upload")
635 
636  # time to get the id for the globaltag
637  tagId = self.get_globalTagInfoget_globalTagInfo(global_tag)
638  if tagId is None:
639  return False
640  tagId = tagId["globalTagId"]
641 
642  # now we could have more than one payload with the same iov so let's go over
643  # it again and remove duplicates but keep the last one for each
644  entries = sorted(set(reversed(entries)))
645 
646  if normalize:
647  name = normalize if normalize is not True else None
648  for e in entries:
649  e.normalize(name=name)
650 
651  # so let's have a list of all payloads (name, checksum) as some payloads
652  # might have multiple iovs. Each payload gets a list of all of those
653  payloads = defaultdict(list)
654  for e in entries:
655  payloads[(e.module, e.checksum)].append(e)
656 
657  existing_payloads = {}
658  existing_iovs = {}
659 
660  def upload_payload(item):
661  """Upload a payload file if necessary but first check list of existing payloads"""
662  key, entries = item
663  if key in existing_payloads:
664  B2INFO(f"{key[0]} (md5:{key[1]}) already existing in database, skipping.")
665  payload_id = existing_payloads[key]
666  else:
667  entry = entries[0]
668  payload_id = self.create_payloadcreate_payload(entry.module, entry.filename, entry.checksum)
669  if payload_id is None:
670  return False
671 
672  B2INFO(f"Created new payload {payload_id} for {entry.module} (md5:{entry.checksum})")
673 
674  for entry in entries:
675  entry.payload = payload_id
676 
677  return True
678 
679  def create_iov(entry):
680  """Create an iov if necessary but first check the list of existing iovs"""
681  if entry.payload is None:
682  return None
683 
684  iov_key = (entry.payload,) + entry.iov_tuple
685  if iov_key in existing_iovs:
686  entry.iov = existing_iovs[iov_key]
687  B2INFO(f"IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum}) already existing in database, skipping.")
688  else:
689  entry.payloadIovId = self.create_iovcreate_iov(tagId, entry.payload, *entry.iov_tuple)
690  if entry.payloadIovId is None:
691  return False
692 
693  B2INFO(f"Created IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum})")
694 
695  return entry
696 
697  # multithreading for the win ...
698  with ThreadPoolExecutor(max_workers=nprocess) as pool:
699  # if we want to check for existing payloads/iovs we schedule the download of
700  # the full payload list. And write a message as each completes
701  if not ignore_existing:
702  B2INFO("Downloading information about existing payloads and iovs...")
703  futures = []
704  existing_iovs = {}
705  existing_payloads = {}
706 
707  def create_future(iter, func, callback=None):
708  fn = pool.submit(iter, func)
709  if callback is not None:
710  fn.add_done_callback(callback)
711  futures.append(fn)
712 
713  def update_iovs(iovs):
714  existing_iovs.update(iovs.result())
715  B2INFO(f"Found {len(existing_iovs)} existing iovs in {global_tag}")
716 
717  def update_payloads(payloads):
718  existing_payloads.update(payloads.result())
719  B2INFO(f"Found {len(existing_payloads)} existing payloads")
720 
721  create_future(self.get_iovsget_iovs, global_tag, update_iovs)
722  # checking existing payloads should not be done with too many at once
723  for chunk in chunks(payloads.keys(), 1000):
724  create_future(self.check_payloadscheck_payloads, chunk, update_payloads)
725 
726  futures_wait(futures)
727 
728  # upload payloads
729  failed_payloads = sum(0 if result else 1 for result in pool.map(upload_payload, payloads.items()))
730  if failed_payloads > 0:
731  B2ERROR(f"{failed_payloads} payloads could not be uploaded")
732 
733  # create IoVs
734  failed_iovs = 0
735  for entry in pool.map(create_iov, entries):
736  if entry:
737  if uploaded_entries is not None:
738  uploaded_entries.append(entry)
739  else:
740  failed_iovs += 1
741  if failed_iovs > 0:
742  B2ERROR(f"{failed_iovs} IoVs could not be created")
743 
744  # update revision numbers
745  if uploaded_entries is not None:
746  self.get_revisionsget_revisions(uploaded_entries)
747 
748  return failed_payloads + failed_iovs == 0
749 
750  def staging_request(self, filename, normalize, data, password):
751  """
752  Upload a testing payload storage to a staging globaltag and create or update a jira issue
753 
754  Parameters:
755  filename (str): filename of the testing payload storage file that should be uploaded
756  normalize (bool/str): if True the payload root files will be
757  normalized to have the same checksum for the same content, if
758  normalize is a string in addition the file name in the root file
759  metadata will be set to it
760  data (dict): a dictionary with the information provided by the user:
761 
762  * task: category of globaltag, either main, online, prompt, data, mc, or analysis
763  * tag: the globaltage name
764  * request: type of request, either Update, New, or Modification. The latter two imply task == main because
765  if new payload classes are introduced or payload classes are modified then they will first be included in
766  the main globaltag. Here a synchronization of code and payload changes has to be managed.
767  If new or modified payload classes should be included in other globaltags they must already be in a release.
768  * pull-request: number of the pull request containing new or modified payload classes,
769  only for request == New or Modified
770  * backward-compatibility: description of what happens if the old payload is encountered by the updated code,
771  only for request == Modified
772  * forward-compatibility: description of what happens if a new payload is encountered by the existing code,
773  only for request == Modified
774  * release: the required release version
775  * reason: the reason for the request
776  * description: a detailed description for the globaltag manager
777  * issue: identifier of an existing jira issue (optional)
778  * user: name of the user
779  * time: time stamp of the request
780 
781  password: the password for access to jira or the access token and secret for oauth access
782 
783  Returns:
784  True if the upload and jira issue creation/upload was successful
785  """
786 
787  # determine the staging globaltag name
788  data['tag'] = upload_global_tag(data['task'])
789  if data['tag'] is None:
790  data['tag'] = f"staging_{data['task']}_{data['user']}_{data['time']}"
791 
792  # create the staging globaltag if it does not exists yet
793  if not self.has_globalTaghas_globalTag(data['tag']):
794  if not self.create_globalTagcreate_globalTag(data['tag'], data['reason'], data['user']):
795  return False
796 
797  # upload the payloads
798  B2INFO(f"Uploading testing database {filename} to globaltag {data['tag']}")
799  entries = []
800  if not self.uploadupload(filename, data['tag'], normalize, uploaded_entries=entries):
801  return False
802 
803  # get the dictionary for the jira issue creation/update
804  if data['issue']:
805  issue = data['issue']
806  else:
807  issue = jira_global_tag_v2(data['task'])
808  if issue is None:
809  issue = {"components": [{"name": "globaltag"}]}
810 
811  # create jira issue text from provided information
812  if type(issue) is tuple:
813  description = issue[1].format(**data)
814  issue = issue[0]
815  else:
816  description = f"""
817 |*Upload globaltag* | {data['tag']} |
818 |*Request reason* | {data['reason']} |
819 |*Required release* | {data['release']} |
820 |*Type of request* | {data['request']} |
821 """
822  if 'pull-request' in data.keys():
823  description += f"|*Pull request* | \\#{data['pull-request']} |\n"
824  if 'backward-compatibility' in data.keys():
825  description += f"|*Backward compatibility* | \\#{data['backward-compatibility']} |\n"
826  if 'forward-compatibility' in data.keys():
827  description += f"|*Forward compatibility* | \\#{data['forward-compatibility']} |\n"
828  description += '|*Details* |' + ''.join(data['details']) + ' |\n'
829  if data['task'] == 'online':
830  description += '|*Impact on data taking*|' + ''.join(data['data_taking']) + ' |\n'
831 
832  # add information about uploaded payloads/IoVs
833  description += '\nPayloads\n||Name||Revision||IoV||\n'
834  for entry in entries:
835  description += f"|{entry.module} | {entry.revision} | ({entry.iov_str()}) |\n"
836 
837  # create a new issue
838  if type(issue) is dict:
839  issue["description"] = description
840  if "summary" in issue.keys():
841  issue["summary"] = issue["summary"].format(**data)
842  else:
843  issue["summary"] = f"Globaltag request for {data['task']} by {data['user']} at {data['time']}"
844  if "project" not in issue.keys():
845  issue["project"] = {"key": "BII"}
846  if "issuetype" not in issue.keys():
847  issue["issuetype"] = {"name": "Task"}
848  if data["task"] == "main":
849  issue["labels"] = ["TUPPR"]
850 
851  B2INFO(f"Creating jira issue for {data['task']} globaltag request")
852  if isinstance(password, str):
853  response = requests.post('https://agira.desy.de/rest/api/latest/issue', auth=(data['user'], password),
854  json={'fields': issue})
855  else:
856  fields = {'issue': json.dumps(issue)}
857  if 'user' in data.keys():
858  fields['user'] = data['user']
859  if password:
860  fields['token'] = password[0]
861  fields['secret'] = password[1]
862  response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
863  if response.status_code in range(200, 210):
864  B2INFO(f"Issue successfully created: https://agira.desy.de/browse/{response.json()['key']}")
865  else:
866  B2ERROR('The creation of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
867  return False
868 
869  # comment on an existing issue
870  else:
871  # Let's make sure all assignees of new issues are added as watchers
872  # in that case, otherwise they might never find out
873  new_issue_config = jira_global_tag_v2(data['task'])
874  if isinstance(new_issue_config, dict) and "assignee" in new_issue_config:
875  user = new_issue_config['assignee'].get('name', None)
876  if user is not None and isinstance(password, str):
877  response = requests.post(f'https://agira.desy.de/rest/api/latest/issue/{issue}/watchers',
878  auth=(data['user'], password), json=user)
879  if response.status_code in range(200, 210):
880  B2INFO(f"Added {user} as watcher to {issue}")
881  else:
882  B2WARNING(f"Could not add {user} as watcher to {issue}: {response.status_code}")
883 
884  B2INFO(f"Commenting on jira issue {issue} for {data['task']} globaltag request")
885  if isinstance(password, str):
886  response = requests.post('https://agira.desy.de/rest/api/latest/issue/%s/comment' % issue,
887  auth=(data['user'], password), json={'body': description})
888  else:
889  fields = {'id': issue, 'user': user, 'comment': description}
890  if password:
891  fields['token'] = password[0]
892  fields['secret'] = password[1]
893  response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
894  if response.status_code in range(200, 210):
895  B2INFO(f"Issue successfully updated: https://agira.desy.de/browse/{issue}")
896  else:
897  B2ERROR('The commenting of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
898  return False
899 
900  return True
901 
902 
903 def require_database_for_test(timeout=60, base_url=None):
904  """Make sure that the database is available and skip the test if not.
905 
906  This function should be called in test scripts if they are expected to fail
907  if the database is down. It either returns when the database is ok or it
908  will signal test_basf2 that the test should be skipped and exit
909  """
910  import sys
911  if os.environ.get("BELLE2_CONDB_GLOBALTAG", None) == "":
912  raise Exception("Access to the Database is disabled")
913  base_url_list = ConditionsDB.get_base_urls(base_url)
914  for url in base_url_list:
915  try:
916  req = requests.request("HEAD", url.rstrip('/') + "/v2/globalTags")
917  req.raise_for_status()
918  except requests.RequestException as e:
919  B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
920  else:
921  break
922  else:
923  print("TEST SKIPPED: No working database servers configured, giving up", file=sys.stderr)
924  sys.exit(1)
925 
926 
927 def enable_debugging():
928  """Enable verbose output of python-requests to be able to debug http connections"""
929  # These two lines enable debugging at httplib level (requests->urllib3->http.client)
930  # You will see the REQUEST, including HEADERS and DATA, and RESPONSE with HEADERS but without DATA.
931  # The only thing missing will be the response.body which is not logged.
932  import http.client as http_client
933  import logging
934  http_client.HTTPConnection.debuglevel = 1
935  # You must initialize logging, otherwise you'll not see debug output.
936  logging.basicConfig()
937  logging.getLogger().setLevel(logging.DEBUG)
938  requests_log = logging.getLogger("requests.packages.urllib3")
939  requests_log.setLevel(logging.DEBUG)
940  requests_log.propagate = True
def get_payloads(self, global_tag=None)
Definition: __init__.py:419
def get_iovs(self, globalTagName, payloadName=None)
Definition: __init__.py:574
def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None)
Definition: __init__.py:605
def get_globalTagType(self, name)
Definition: __init__.py:345
def check_payloads(self, payloads, information="payloadId")
Definition: __init__.py:443
def has_globalTag(self, name)
Definition: __init__.py:323
def set_authentication(self, user, password, basic=True)
Definition: __init__.py:240
def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun)
Definition: __init__.py:539
def request(self, method, url, message=None, *args, **argk)
Definition: __init__.py:252
def __init__(self, base_url=None, max_connections=10, retries=3)
Definition: __init__.py:193
def get_revisions(self, entries)
Definition: __init__.py:475
def create_globalTag(self, name, description, user)
Definition: __init__.py:364
def get_globalTagInfo(self, name)
Definition: __init__.py:333
_session
session object to get keep-alive support and connection pooling
Definition: __init__.py:204
def get_all_iovs(self, globalTag, exp=None, run=None, message=None)
Definition: __init__.py:377
def get_base_urls(given_url)
Definition: __init__.py:162
def staging_request(self, filename, normalize, data, password)
Definition: __init__.py:750
def create_payload(self, module, filename, checksum=None)
Definition: __init__.py:496
_base_url
base url to be prepended to all requests
Definition: __init__.py:223
payload_id
payload id in CDB, not used for comparisons
Definition: __init__.py:95
iov_id
iov id in CDB, not used for comparisons
Definition: __init__.py:97
name
name of the payload
Definition: __init__.py:87
checksum
checksum of the payload
Definition: __init__.py:89
def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None)
Definition: __init__.py:82
def from_json(cls, payload, iov=None)
Definition: __init__.py:61
revision
revision, not used for comparisons
Definition: __init__.py:93
iov
interval of validity
Definition: __init__.py:91