Belle II Software  light-2212-foldex
__init__.py
1 #!/usr/bin/env python3
2 
3 
10 
11 """
12 conditions_db
13 -------------
14 
15 Python interface to the ConditionsDB
16 """
17 
18 import os
19 from basf2 import B2FATAL, B2ERROR, B2INFO, B2WARNING
20 import requests
21 from requests.auth import HTTPBasicAuth, HTTPDigestAuth
22 from requests.packages.urllib3.fields import RequestField
23 from requests.packages.urllib3.filepost import encode_multipart_formdata
24 import json
25 import urllib
26 from versioning import upload_global_tag, jira_global_tag_v2
27 from collections import defaultdict
28 from concurrent.futures import ThreadPoolExecutor, wait as futures_wait
29 import hashlib
30 import itertools
31 from typing import Union # noqa
32 
33 
34 def encode_name(name):
35  """Escape name to be used in an url"""
36  return urllib.parse.quote(name, safe="")
37 
38 
39 def file_checksum(filename):
40  """Calculate md5 hash of file"""
41  md5hash = hashlib.md5()
42  with open(filename, "rb") as data:
43  md5hash.update(data.read())
44  return md5hash.hexdigest()
45 
46 
47 def chunks(container, chunk_size):
48  """Cut a container in chunks of max. chunk_size"""
49  it = iter(container)
50  while True:
51  chunk = tuple(itertools.islice(it, chunk_size))
52  if not chunk:
53  return
54  yield chunk
55 
56 
58  """Small container class to help compare payload information for efficient
59  comparison between globaltags"""
60 
61  @classmethod
62  def from_json(cls, payload, iov=None):
63  """Set all internal members from the json information of the payload and the iov.
64 
65  Arguments:
66  payload (dict): json information of the payload as returned by REST api
67  iov (dict): json information of the iov as returned by REST api
68  """
69  if iov is None:
70  iov = {"payloadIovId": None, "expStart": None, "runStart": None, "expEnd": None, "runEnd": None}
71 
72  return cls(
73  payload['payloadId'],
74  payload['basf2Module']['name'],
75  payload['revision'],
76  payload['checksum'],
77  payload['payloadUrl'],
78  payload['baseUrl'],
79  iov['payloadIovId'],
80  (iov["expStart"], iov["runStart"], iov["expEnd"], iov["runEnd"]),
81  )
82 
83  def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None):
84  """
85  Create a new object from the given information
86  """
87 
88  self.namename = name
89 
90  self.checksumchecksum = checksum
91 
92  self.ioviov = iov
93 
94  self.revisionrevision = revision
95 
96  self.payload_idpayload_id = payload_id
97 
98  self.iov_idiov_id = iov_id
99 
100  self.base_urlbase_url = base_url
101 
102  self.payload_urlpayload_url = payload_url
103 
104  @property
105  def url(self):
106  """Return the full url to the payload on the server"""
107  return urllib.parse.urljoin(self.base_urlbase_url + '/', self.payload_urlpayload_url)
108 
109  def __hash__(self):
110  """Make object hashable"""
111  return hash((self.namename, self.checksumchecksum, self.ioviov))
112 
113  def __eq__(self, other):
114  """Check if two payloads are equal"""
115  return (self.namename, self.checksumchecksum, self.ioviov) == (other.name, other.checksum, other.iov)
116 
117  def __lt__(self, other):
118  """Sort payloads by name, iov, revision"""
119  return (self.namename.lower(), self.ioviov, self.revisionrevision) < (other.name.lower(), other.iov, other.revision)
120 
121  def readable_iov(self):
122  """return a human readable name for the IoV"""
123  if self.ioviov is None:
124  return "none"
125 
126  if self.ioviov == (0, 0, -1, -1):
127  return "always"
128 
129  e1, r1, e2, r2 = self.ioviov
130  if e1 == e2:
131  if r1 == 0 and r2 == -1:
132  return f"exp {e1}"
133  elif r2 == -1:
134  return f"exp {e1}, runs {r1}+"
135  elif r1 == r2:
136  return f"exp {e1}, run {r1}"
137  else:
138  return f"exp {e1}, runs {r1} - {r2}"
139  else:
140  if e2 == -1 and r1 == 0:
141  return f"exp {e1} - forever"
142  elif e2 == -1:
143  return f"exp {e1}, run {r1} - forever"
144  elif r1 == 0 and r2 == -1:
145  return f"exp {e1}-{e2}, all runs"
146  elif r2 == -1:
147  return f"exp {e1}, run {r1} - exp {e2}, all runs"
148  else:
149  return f"exp {e1}, run {r1} - exp {e2}, run {r2}"
150 
151 
153  """Class to interface conditions db REST interface"""
154 
155 
156  BASE_URLS = ["http://belle2db.sdcc.bnl.gov/b2s/rest/"]
157 
158  class RequestError(RuntimeError):
159  """Class to be thrown by request() if there is any error"""
160  pass
161 
162  @staticmethod
163  def get_base_urls(given_url):
164  """Resolve the list of server urls. If a url is given just return it.
165  Otherwise return servers listed in BELLE2_CONDB_SERVERLIST or the
166  builtin defaults
167 
168  Arguments:
169  given_url (str): Explicit base_url. If this is not None it will be
170  returned as is in a list of length 1
171 
172  Returns:
173  a list of urls to try for database connectivity
174  """
175 
176  base_url_list = ConditionsDB.BASE_URLS[:]
177  base_url_env = os.environ.get("BELLE2_CONDB_SERVERLIST", None)
178  if given_url is not None:
179  base_url_list = [given_url]
180  elif base_url_env is not None:
181  base_url_list = base_url_env.split()
182  B2INFO("Getting Conditions Database servers from Environment:")
183  for i, url in enumerate(base_url_list, 1):
184  B2INFO(f" {i}. {url}")
185  # try to escalate to https for all given urls
186  full_list = []
187  for url in base_url_list:
188  if url.startswith("http://"):
189  full_list.append("https" + url[4:])
190  # but keep the http in case of connection problems
191  full_list.append(url)
192  return full_list
193 
194  def __init__(self, base_url=None, max_connections=10, retries=3):
195  """
196  Create a new instance of the interface
197 
198  Args:
199  base_url (string): base url of the rest interface
200  max_connections (int): number of connections to keep open, mostly useful for threaded applications
201  retries (int): number of retries in case of connection problems
202  """
203 
204 
205  self._session_session = requests.Session()
206  # and set the connection options we want to have
207  adapter = requests.adapters.HTTPAdapter(
208  pool_connections=max_connections, pool_maxsize=max_connections,
209  max_retries=retries, pool_block=True
210  )
211  self._session_session.mount("http://", adapter)
212  self._session_session.mount("https://", adapter)
213  # and also set the proxy settings
214  if "BELLE2_CONDB_PROXY" in os.environ:
215  self._session_session.proxies = {
216  "http": os.environ.get("BELLE2_CONDB_PROXY"),
217  "https": os.environ.get("BELLE2_CONDB_PROXY"),
218  }
219  # test the given url or try the known defaults
220  base_url_list = ConditionsDB.get_base_urls(base_url)
221 
222  for url in base_url_list:
223 
224  self._base_url_base_url = url.rstrip("/") + "/"
225  try:
226  req = self._session_session.request("HEAD", self._base_url_base_url + "v2/globalTags")
227  req.raise_for_status()
228  except requests.RequestException as e:
229  B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
230  else:
231  break
232  else:
233  B2FATAL("No working database servers configured, giving up")
234 
235  # We have a working server so change the api to return json instead of
236  # xml, much easier in python, also request non-cached replies. We do
237  # this now because for the server check above we're fine with cached
238  # results.
239  self._session_session.headers.update({"Accept": "application/json", "Cache-Control": "no-cache"})
240 
241  def set_authentication(self, user, password, basic=True):
242  """
243  Set authentication credentials when talking to the database
244 
245  Args:
246  user (str): username
247  password (str): password
248  basic (bool): if True us HTTP Basic authentication, otherwise HTTP Digest
249  """
250  authtype = HTTPBasicAuth if basic else HTTPDigestAuth
251  self._session_session.auth = authtype(user, password)
252 
253  def request(self, method, url, message=None, *args, **argk):
254  """
255  Request function, similar to requests.request but adding the base_url
256 
257  Args:
258  method (str): GET, POST, etc.
259  url (str): url for the request, base_url will be prepended
260  message (str): message to show when starting the request and if it fails
261 
262  All other arguments will be forwarded to requests.request.
263  """
264  if message is not None:
265  B2INFO(message)
266 
267  try:
268  req = self._session_session.request(method, self._base_url_base_url + "v2/" + url.lstrip("/"), *args, **argk)
269  except requests.exceptions.ConnectionError as e:
270  B2FATAL("Could not access '" + self._base_url_base_url + url.lstrip("/") + "': " + str(e))
271 
272  if req.status_code >= 300:
273  # Apparently something is not good. Let's try to decode the json
274  # reply containing reason and message
275  try:
276  response = req.json()
277  message = response.get("message", "")
278  colon = ": " if message.strip() else ""
279  error = "Request {method} {url} returned {code} {reason}{colon}{message}".format(
280  method=method, url=url,
281  code=response["code"],
282  reason=response["reason"],
283  message=message,
284  colon=colon,
285  )
286  except json.JSONDecodeError:
287  # seems the reply was not even json
288  error = "Request {method} {url} returned non JSON response {code}: {content}".format(
289  method=method, url=url,
290  code=req.status_code,
291  content=req.content
292  )
293 
294  if message is not None:
295  raise ConditionsDB.RequestError(f"{message} failed: {error}")
296  else:
297  raise ConditionsDB.RequestError(error)
298 
299  if method != "HEAD" and req.status_code != requests.codes.no_content:
300  try:
301  req.json()
302  except json.JSONDecodeError as e:
303  B2INFO(f"Invalid response: {req.content}")
304  raise ConditionsDB.RequestError("{method} {url} returned invalid JSON response {}"
305  .format(e, method=method, url=url))
306  return req
307 
308  def get_globalTags(self):
309  """Get a list of all globaltags. Returns a dictionary with the globaltag
310  names and the corresponding ids in the database"""
311 
312  try:
313  req = self.requestrequest("GET", "/globalTags")
314  except ConditionsDB.RequestError as e:
315  B2ERROR(f"Could not get the list of globaltags: {e}")
316  return None
317 
318  result = {}
319  for tag in req.json():
320  result[tag["name"]] = tag
321 
322  return result
323 
324  def has_globalTag(self, name):
325  """Check whether the globaltag with the given name exists."""
326 
327  try:
328  self.requestrequest("GET", "/globalTag/{globalTagName}".format(globalTagName=encode_name(name)))
330  return False
331 
332  return True
333 
334  def get_globalTagInfo(self, name):
335  """Get the id of the globaltag with the given name. Returns either the
336  id or None if the tag was not found"""
337 
338  try:
339  req = self.requestrequest("GET", "/globalTag/{globalTagName}".format(globalTagName=encode_name(name)))
340  except ConditionsDB.RequestError as e:
341  B2ERROR(f"Cannot find globaltag '{name}': {e}")
342  return None
343 
344  return req.json()
345 
346  def get_globalTagType(self, name):
347  """
348  Get the dictionary describing the given globaltag type (currently
349  one of DEV or RELEASE). Returns None if tag type was not found.
350  """
351  try:
352  req = self.requestrequest("GET", "/globalTagType")
353  except ConditionsDB.RequestError as e:
354  B2ERROR(f"Could not get list of valid globaltag types: {e}")
355  return None
356 
357  types = {e["name"]: e for e in req.json()}
358 
359  if name in types:
360  return types[name]
361 
362  B2ERROR("Unknown globaltag type: '{}', please use one of {}".format(name, ", ".join(types)))
363  return None
364 
365  def create_globalTag(self, name, description, user):
366  """
367  Create a new globaltag
368  """
369  info = {"name": name, "description": description, "modifiedBy": user, "isDefault": False}
370  try:
371  req = self.requestrequest("POST", "/globalTag/DEV", f"Creating globaltag {name}", json=info)
372  except ConditionsDB.RequestError as e:
373  B2ERROR(f"Could not create globaltag {name}: {e}")
374  return None
375 
376  return req.json()
377 
378  def get_all_iovs(self, globalTag, exp=None, run=None, message=None):
379  """
380  Return list of all payloads in the given globaltag where each element is
381  a `PayloadInformation` instance
382 
383  Parameters:
384  gobalTag (str): name of the globaltag
385  exp (int): if given limit the list of payloads to the ones valid for
386  the given exp,run combination
387  run (int): if given limit the list of payloads to the ones valid for
388  the given exp,run combination
389  message (str): additional message to show when downloading the
390  payload information. Will be directly appended to
391  "Obtaining lists of iovs for globaltag {globalTag}"
392 
393  Warning:
394  Both, exp and run, need to be given at the same time. Just supplying
395  an experiment or a run number will not work
396  """
397  globalTag = encode_name(globalTag)
398  if message is None:
399  message = ""
400  if exp is not None:
401  msg = f"Obtaining list of iovs for globaltag {globalTag}, exp={exp}, run={run}{message}"
402  req = self.requestrequest("GET", "/iovPayloads", msg, params={'gtName': globalTag, 'expNumber': exp, 'runNumber': run})
403  else:
404  msg = f"Obtaining list of iovs for globaltag {globalTag}{message}"
405  req = self.requestrequest("GET", f"/globalTag/{globalTag}/globalTagPayloads", msg)
406  all_iovs = []
407  for item in req.json():
408  payload = item["payload" if 'payload' in item else "payloadId"]
409  if "payloadIov" in item:
410  iovs = [item['payloadIov']]
411  else:
412  iovs = item['payloadIovs']
413 
414  for iov in iovs:
415  all_iovs.append(PayloadInformation.from_json(payload, iov))
416 
417  all_iovs.sort()
418  return all_iovs
419 
420  def get_payloads(self, global_tag=None):
421  """
422  Get a list of all defined payloads (for the given global_tag or by default for all).
423  Returns a dictionary which maps (module, checksum) to the payload id.
424  """
425 
426  try:
427  if global_tag:
428  req = self.requestrequest("GET", "/globalTag/{global_tag}/payloads"
429  .format(global_tag=encode_name(global_tag)))
430  else:
431  req = self.requestrequest("GET", "/payloads")
432  except ConditionsDB.RequestError as e:
433  B2ERROR(f"Cannot get list of payloads: {e}")
434  return {}
435 
436  result = {}
437  for payload in req.json():
438  module = payload["basf2Module"]["name"]
439  checksum = payload["checksum"]
440  result[(module, checksum)] = payload["payloadId"]
441 
442  return result
443 
444  def check_payloads(self, payloads, information="payloadId"):
445  """
446  Check for the existence of payloads in the database.
447 
448  Arguments:
449  payloads (list((str,str))): A list of payloads to check for. Each
450  payload needs to be a tuple of the name of the payload and the
451  md5 checksum of the payload file.
452  information (str): The information to be extracted from the
453  payload dictionary
454 
455  Returns:
456  A dictionary with the payload identifiers (name, checksum) as keys
457  and the requested information as values for all payloads which are already
458  present in the database.
459  """
460 
461  search_query = [{"name": e[0], "checksum": e[1]} for e in payloads]
462  try:
463  req = self.requestrequest("POST", "/checkPayloads", json=search_query)
464  except ConditionsDB.RequestError as e:
465  B2ERROR(f"Cannot check for existing payloads: {e}")
466  return {}
467 
468  result = {}
469  for payload in req.json():
470  module = payload["basf2Module"]["name"]
471  checksum = payload["checksum"]
472  result[(module, checksum)] = payload[information]
473 
474  return result
475 
476  def get_revisions(self, entries):
477  """
478  Get the revision numbers of payloads in the database.
479 
480  Arguments:
481  entries (list): A list of payload entries.
482  Each entry must have the attributes module and checksum.
483 
484  Returns:
485  True if successful.
486  """
487 
488  result = self.check_payloadscheck_payloads([(entry.module, entry.checksum) for entry in entries], "revision")
489  if not result:
490  return False
491 
492  for entry in entries:
493  entry.revision = result.get((entry.module, entry.checksum), 0)
494 
495  return True
496 
497  def create_payload(self, module, filename, checksum=None):
498  """
499  Create a new payload
500 
501  Args:
502  module (str): name of the module
503  filename (str): name of the file
504  checksum (str): md5 hexdigest of the file. Will be calculated automatically if not given
505  """
506  if checksum is None:
507  checksum = file_checksum(filename)
508 
509  # this is the only complicated request as we have to provide a
510  # multipart/mixed request which is not directly provided by the request
511  # library.
512  files = [
513  (filename, open(filename, "rb").read(), "application/x-root"),
514  ("json", json.dumps({"checksum": checksum, "isDefault": False}), "application/json"),
515  ]
516  # ok we have the two "files" we want to send, create multipart/mixed
517  # body
518  fields = []
519  for name, contents, mimetype in files:
520  rf = RequestField(name=name, data=contents)
521  rf.make_multipart(content_type=mimetype)
522  fields.append(rf)
523 
524  post_body, content_type = encode_multipart_formdata(fields)
525  content_type = ''.join(('multipart/mixed',) + content_type.partition(';')[1:])
526  headers = {'Content-Type': content_type}
527 
528  # now make the request. Note to self: if multipart/form-data would be
529  # accepted this would be so much nicer here. but it works.
530  try:
531  req = self.requestrequest("POST", "/package/dbstore/module/{moduleName}/payload"
532  .format(moduleName=encode_name(module)),
533  data=post_body, headers=headers)
534  except ConditionsDB.RequestError as e:
535  B2ERROR(f"Could not create Payload: {e}")
536  return None
537 
538  return req.json()["payloadId"]
539 
540  def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun):
541  """
542  Create an iov.
543 
544  Args:
545  globalTagId (int): id of the globaltag, obtain with get_globalTagId()
546  payloadId (int): id of the payload, obtain from create_payload() or get_payloads()
547  firstExp (int): first experiment for which this iov is valid
548  firstRun (int): first run for which this iov is valid
549  finalExp (int): final experiment for which this iov is valid
550  finalRun (int): final run for which this iov is valid
551 
552  Returns:
553  payloadIovId of the created iov, None if creation was not successful
554  """
555  try:
556  # try to convert all arguments except self to integers to make sure they are
557  # valid.
558  local_variables = locals()
559  variables = {e: int(local_variables[e]) for e in
560  ["globalTagId", "payloadId", "firstExp", "firstRun", "finalExp", "finalRun"]}
561  except ValueError:
562  B2ERROR("create_iov: All parameters need to be integers")
563  return None
564 
565  # try to create the iov
566  try:
567  req = self.requestrequest("POST", "/globalTagPayload/{globalTagId},{payloadId}"
568  "/payloadIov/{firstExp},{firstRun},{finalExp},{finalRun}".format(**variables))
569  except ConditionsDB.RequestError as e:
570  B2ERROR(f"Could not create IOV: {e}")
571  return None
572 
573  return req.json()["payloadIovId"]
574 
575  def get_iovs(self, globalTagName, payloadName=None):
576  """Return existing iovs for a given tag name. It returns a dictionary
577  which maps (payloadId, first runId, final runId) to iovId
578 
579  Parameters:
580  globalTagName(str): Global tag name.
581  payloadName(str): Payload name (if None, selection by name is
582  not performed.
583  """
584 
585  try:
586  req = self.requestrequest("GET", "/globalTag/{globalTagName}/globalTagPayloads"
587  .format(globalTagName=encode_name(globalTagName)))
589  # there could be just no iovs so no error
590  return {}
591 
592  result = {}
593  for payload in req.json():
594  payloadId = payload["payloadId"]["payloadId"]
595  if payloadName is not None:
596  if payload["payloadId"]["basf2Module"]["name"] != payloadName:
597  continue
598  for iov in payload["payloadIovs"]:
599  iovId = iov["payloadIovId"]
600  firstExp, firstRun = iov["expStart"], iov["runStart"]
601  finalExp, finalRun = iov["expEnd"], iov["runEnd"]
602  result[(payloadId, firstExp, firstRun, finalExp, finalRun)] = iovId
603 
604  return result
605 
606  def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None):
607  """
608  Upload a testing payload storage to the conditions database.
609 
610  Parameters:
611  filename (str): filename of the testing payload storage file that should be uploaded
612  global_tage (str): name of the globaltag to which the data should be uploaded
613  normalize (Union[bool, str]): if True the payload root files will be normalized to have the same checksum for the
614  same content, if normalize is a string in addition the file name in the root file metadata will be set to it
615  ignore_existing (bool): if True do not upload payloads that already exist
616  nprocess (int): maximal number of parallel uploads
617  uploaded_entries (list): the list of successfully uploaded entries
618 
619  Returns:
620  True if the upload was successful
621  """
622 
623  # first create a list of payloads
624  from conditions_db.testing_payloads import parse_testing_payloads_file
625  B2INFO(f"Reading payload list from {filename}")
626  entries = parse_testing_payloads_file(filename)
627  if entries is None:
628  B2ERROR(f"Problems with testing payload storage file {filename}, exiting")
629  return False
630 
631  if not entries:
632  B2INFO(f"No payloads found in {filename}, exiting")
633  return True
634 
635  B2INFO(f"Found {len(entries)} iovs to upload")
636 
637  # time to get the id for the globaltag
638  tagId = self.get_globalTagInfoget_globalTagInfo(global_tag)
639  if tagId is None:
640  return False
641  tagId = tagId["globalTagId"]
642 
643  # now we could have more than one payload with the same iov so let's go over
644  # it again and remove duplicates but keep the last one for each
645  entries = sorted(set(reversed(entries)))
646 
647  if normalize:
648  name = normalize if normalize is not True else None
649  for e in entries:
650  e.normalize(name=name)
651 
652  # so let's have a list of all payloads (name, checksum) as some payloads
653  # might have multiple iovs. Each payload gets a list of all of those
654  payloads = defaultdict(list)
655  for e in entries:
656  payloads[(e.module, e.checksum)].append(e)
657 
658  existing_payloads = {}
659  existing_iovs = {}
660 
661  def upload_payload(item):
662  """Upload a payload file if necessary but first check list of existing payloads"""
663  key, entries = item
664  if key in existing_payloads:
665  B2INFO(f"{key[0]} (md5:{key[1]}) already existing in database, skipping.")
666  payload_id = existing_payloads[key]
667  else:
668  entry = entries[0]
669  payload_id = self.create_payloadcreate_payload(entry.module, entry.filename, entry.checksum)
670  if payload_id is None:
671  return False
672 
673  B2INFO(f"Created new payload {payload_id} for {entry.module} (md5:{entry.checksum})")
674 
675  for entry in entries:
676  entry.payload = payload_id
677 
678  return True
679 
680  def create_iov(entry):
681  """Create an iov if necessary but first check the list of existing iovs"""
682  if entry.payload is None:
683  return None
684 
685  iov_key = (entry.payload,) + entry.iov_tuple
686  if iov_key in existing_iovs:
687  entry.iov = existing_iovs[iov_key]
688  B2INFO(f"IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum}) already existing in database, skipping.")
689  else:
690  entry.payloadIovId = self.create_iovcreate_iov(tagId, entry.payload, *entry.iov_tuple)
691  if entry.payloadIovId is None:
692  return False
693 
694  B2INFO(f"Created IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum})")
695 
696  return entry
697 
698  # multithreading for the win ...
699  with ThreadPoolExecutor(max_workers=nprocess) as pool:
700  # if we want to check for existing payloads/iovs we schedule the download of
701  # the full payload list. And write a message as each completes
702  if not ignore_existing:
703  B2INFO("Downloading information about existing payloads and iovs...")
704  futures = []
705  existing_iovs = {}
706  existing_payloads = {}
707 
708  def create_future(iter, func, callback=None):
709  fn = pool.submit(iter, func)
710  if callback is not None:
711  fn.add_done_callback(callback)
712  futures.append(fn)
713 
714  def update_iovs(iovs):
715  existing_iovs.update(iovs.result())
716  B2INFO(f"Found {len(existing_iovs)} existing iovs in {global_tag}")
717 
718  def update_payloads(payloads):
719  existing_payloads.update(payloads.result())
720  B2INFO(f"Found {len(existing_payloads)} existing payloads")
721 
722  create_future(self.get_iovsget_iovs, global_tag, update_iovs)
723  # checking existing payloads should not be done with too many at once
724  for chunk in chunks(payloads.keys(), 1000):
725  create_future(self.check_payloadscheck_payloads, chunk, update_payloads)
726 
727  futures_wait(futures)
728 
729  # upload payloads
730  failed_payloads = sum(0 if result else 1 for result in pool.map(upload_payload, payloads.items()))
731  if failed_payloads > 0:
732  B2ERROR(f"{failed_payloads} payloads could not be uploaded")
733 
734  # create IoVs
735  failed_iovs = 0
736  for entry in pool.map(create_iov, entries):
737  if entry:
738  if uploaded_entries is not None:
739  uploaded_entries.append(entry)
740  else:
741  failed_iovs += 1
742  if failed_iovs > 0:
743  B2ERROR(f"{failed_iovs} IoVs could not be created")
744 
745  # update revision numbers
746  if uploaded_entries is not None:
747  self.get_revisionsget_revisions(uploaded_entries)
748 
749  return failed_payloads + failed_iovs == 0
750 
751  def staging_request(self, filename, normalize, data, password):
752  """
753  Upload a testing payload storage to a staging globaltag and create or update a jira issue
754 
755  Parameters:
756  filename (str): filename of the testing payload storage file that should be uploaded
757  normalize (Union[bool, str]): if True the payload root files will be
758  normalized to have the same checksum for the same content, if
759  normalize is a string in addition the file name in the root file
760  metadata will be set to it
761  data (dict): a dictionary with the information provided by the user:
762 
763  * task: category of globaltag, either main, online, prompt, data, mc, or analysis
764  * tag: the globaltag name
765  * request: type of request, either Update, New, or Modification. The latter two imply task == main because
766  if new payload classes are introduced or payload classes are modified then they will first be included in
767  the main globaltag. Here a synchronization of code and payload changes has to be managed.
768  If new or modified payload classes should be included in other globaltags they must already be in a release.
769  * pull-request: number of the pull request containing new or modified payload classes,
770  only for request == New or Modified
771  * backward-compatibility: description of what happens if the old payload is encountered by the updated code,
772  only for request == Modified
773  * forward-compatibility: description of what happens if a new payload is encountered by the existing code,
774  only for request == Modified
775  * release: the required release version
776  * reason: the reason for the request
777  * description: a detailed description for the globaltag manager
778  * issue: identifier of an existing jira issue (optional)
779  * user: name of the user
780  * time: time stamp of the request
781 
782  password: the password for access to jira or the access token and secret for oauth access
783 
784  Returns:
785  True if the upload and jira issue creation/upload was successful
786  """
787 
788  # determine the staging globaltag name
789  data['tag'] = upload_global_tag(data['task'])
790  if data['tag'] is None:
791  data['tag'] = f"temp_{data['task']}_{data['user']}_{data['time']}"
792 
793  # create the staging globaltag if it does not exists yet
794  if not self.has_globalTaghas_globalTag(data['tag']):
795  if not self.create_globalTagcreate_globalTag(data['tag'], data['reason'], data['user']):
796  return False
797 
798  # upload the payloads
799  B2INFO(f"Uploading testing database {filename} to globaltag {data['tag']}")
800  entries = []
801  if not self.uploadupload(filename, data['tag'], normalize, uploaded_entries=entries):
802  return False
803 
804  # get the dictionary for the jira issue creation/update
805  if data['issue']:
806  issue = data['issue']
807  else:
808  issue = jira_global_tag_v2(data['task'])
809  if issue is None:
810  issue = {"components": [{"name": "globaltag"}]}
811 
812  # create jira issue text from provided information
813  if type(issue) is tuple:
814  description = issue[1].format(**data)
815  issue = issue[0]
816  else:
817  description = f"""
818 |*Upload globaltag* | {data['tag']} |
819 |*Request reason* | {data['reason']} |
820 |*Required release* | {data['release']} |
821 |*Type of request* | {data['request']} |
822 """
823  if 'pull-request' in data.keys():
824  description += f"|*Pull request* | \\#{data['pull-request']} |\n"
825  if 'backward-compatibility' in data.keys():
826  description += f"|*Backward compatibility* | \\#{data['backward-compatibility']} |\n"
827  if 'forward-compatibility' in data.keys():
828  description += f"|*Forward compatibility* | \\#{data['forward-compatibility']} |\n"
829  description += '|*Details* |' + ''.join(data['details']) + ' |\n'
830  if data['task'] == 'online':
831  description += '|*Impact on data taking*|' + ''.join(data['data_taking']) + ' |\n'
832 
833  # add information about uploaded payloads/IoVs
834  description += '\nPayloads\n||Name||Revision||IoV||\n'
835  for entry in entries:
836  description += f"|{entry.module} | {entry.revision} | ({entry.iov_str()}) |\n"
837 
838  # create a new issue
839  if type(issue) is dict:
840  issue["description"] = description
841  if "summary" in issue.keys():
842  issue["summary"] = issue["summary"].format(**data)
843  else:
844  issue["summary"] = f"Globaltag request for {data['task']} by {data['user']} at {data['time']}"
845  if "project" not in issue.keys():
846  issue["project"] = {"key": "BII"}
847  if "issuetype" not in issue.keys():
848  issue["issuetype"] = {"name": "Task"}
849  if data["task"] == "main":
850  issue["labels"] = ["TUPPR"]
851 
852  B2INFO(f"Creating jira issue for {data['task']} globaltag request")
853  if isinstance(password, str):
854  response = requests.post('https://agira.desy.de/rest/api/latest/issue', auth=(data['user'], password),
855  json={'fields': issue})
856  else:
857  fields = {'issue': json.dumps(issue)}
858  if 'user' in data.keys():
859  fields['user'] = data['user']
860  if password:
861  fields['token'] = password[0]
862  fields['secret'] = password[1]
863  response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
864  if response.status_code in range(200, 210):
865  B2INFO(f"Issue successfully created: https://agira.desy.de/browse/{response.json()['key']}")
866  else:
867  B2ERROR('The creation of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
868  return False
869 
870  # comment on an existing issue
871  else:
872  # Let's make sure all assignees of new issues are added as watchers
873  # in that case, otherwise they might never find out
874  new_issue_config = jira_global_tag_v2(data['task'])
875  if isinstance(new_issue_config, dict) and "assignee" in new_issue_config:
876  user = new_issue_config['assignee'].get('name', None)
877  if user is not None and isinstance(password, str):
878  response = requests.post(f'https://agira.desy.de/rest/api/latest/issue/{issue}/watchers',
879  auth=(data['user'], password), json=user)
880  if response.status_code in range(200, 210):
881  B2INFO(f"Added {user} as watcher to {issue}")
882  else:
883  B2WARNING(f"Could not add {user} as watcher to {issue}: {response.status_code}")
884 
885  B2INFO(f"Commenting on jira issue {issue} for {data['task']} globaltag request")
886  if isinstance(password, str):
887  response = requests.post('https://agira.desy.de/rest/api/latest/issue/%s/comment' % issue,
888  auth=(data['user'], password), json={'body': description})
889  else:
890  fields = {'id': issue, 'user': user, 'comment': description}
891  if password:
892  fields['token'] = password[0]
893  fields['secret'] = password[1]
894  response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
895  if response.status_code in range(200, 210):
896  B2INFO(f"Issue successfully updated: https://agira.desy.de/browse/{issue}")
897  else:
898  B2ERROR('The commenting of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
899  return False
900 
901  return True
902 
903 
904 def require_database_for_test(timeout=60, base_url=None):
905  """Make sure that the database is available and skip the test if not.
906 
907  This function should be called in test scripts if they are expected to fail
908  if the database is down. It either returns when the database is ok or it
909  will signal test_basf2 that the test should be skipped and exit
910  """
911  import sys
912  if os.environ.get("BELLE2_CONDB_GLOBALTAG", None) == "":
913  raise Exception("Access to the Database is disabled")
914  base_url_list = ConditionsDB.get_base_urls(base_url)
915  for url in base_url_list:
916  try:
917  req = requests.request("HEAD", url.rstrip('/') + "/v2/globalTags")
918  req.raise_for_status()
919  except requests.RequestException as e:
920  B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
921  else:
922  break
923  else:
924  print("TEST SKIPPED: No working database servers configured, giving up", file=sys.stderr)
925  sys.exit(1)
926 
927 
928 def enable_debugging():
929  """Enable verbose output of python-requests to be able to debug http connections"""
930  # These two lines enable debugging at httplib level (requests->urllib3->http.client)
931  # You will see the REQUEST, including HEADERS and DATA, and RESPONSE with HEADERS but without DATA.
932  # The only thing missing will be the response.body which is not logged.
933  import http.client as http_client
934  import logging
935  http_client.HTTPConnection.debuglevel = 1
936  # You must initialize logging, otherwise you'll not see debug output.
937  logging.basicConfig()
938  logging.getLogger().setLevel(logging.DEBUG)
939  requests_log = logging.getLogger("requests.packages.urllib3")
940  requests_log.setLevel(logging.DEBUG)
941  requests_log.propagate = True
def get_payloads(self, global_tag=None)
Definition: __init__.py:420
def get_iovs(self, globalTagName, payloadName=None)
Definition: __init__.py:575
def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None)
Definition: __init__.py:606
def get_globalTagType(self, name)
Definition: __init__.py:346
def check_payloads(self, payloads, information="payloadId")
Definition: __init__.py:444
def has_globalTag(self, name)
Definition: __init__.py:324
def set_authentication(self, user, password, basic=True)
Definition: __init__.py:241
def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun)
Definition: __init__.py:540
def request(self, method, url, message=None, *args, **argk)
Definition: __init__.py:253
def __init__(self, base_url=None, max_connections=10, retries=3)
Definition: __init__.py:194
def get_revisions(self, entries)
Definition: __init__.py:476
def create_globalTag(self, name, description, user)
Definition: __init__.py:365
def get_globalTagInfo(self, name)
Definition: __init__.py:334
_session
session object to get keep-alive support and connection pooling
Definition: __init__.py:205
def get_all_iovs(self, globalTag, exp=None, run=None, message=None)
Definition: __init__.py:378
def get_base_urls(given_url)
Definition: __init__.py:163
def staging_request(self, filename, normalize, data, password)
Definition: __init__.py:751
def create_payload(self, module, filename, checksum=None)
Definition: __init__.py:497
_base_url
base url to be prepended to all requests
Definition: __init__.py:224
payload_id
payload id in CDB, not used for comparisons
Definition: __init__.py:96
iov_id
iov id in CDB, not used for comparisons
Definition: __init__.py:98
name
name of the payload
Definition: __init__.py:88
checksum
checksum of the payload
Definition: __init__.py:90
def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None)
Definition: __init__.py:83
def from_json(cls, payload, iov=None)
Definition: __init__.py:62
revision
revision, not used for comparisons
Definition: __init__.py:94
iov
interval of validity
Definition: __init__.py:92