Belle II Software  release-06-02-00
runningupdate.py
1 #!/usr/bin/env python3
2 
3 
10 
11 """
12 This module contains some classes to check the possibility and calculate the
13 necessary updates for a running globaltag
14 """
15 
16 from enum import Enum
17 from collections import defaultdict
18 from basf2 import B2ERROR, B2WARNING, B2INFO # noqa
19 from conditions_db import ConditionsDB
20 from conditions_db.iov import IntervalOfValidity, IoVSet
21 
22 
24  """Define the different modes for the update of a running tag"""
25 
28  STRICT = 1
29 
30  ALLOW_CLOSED = 2
31 
32  FIX_CLOSED = 3
33 
35  SIMPLE = 4
36 
40  FULL_REPLACEMENT = 6
41 
42 
43 class RunningTagUpdaterError(Exception):
44  """
45  Errors raised when trying to update the running globaltag. Can have extra
46  variables in `extra_vars` to be shown to the user for additional information
47  """
48 
49  def __init__(self, description, **extra_vars):
50  super().__init__(description)
51 
52  self.extra_varsextra_vars = extra_vars
53 
54 
56  """
57  Calculate and apply the necessary changes to update a running globaltag
58 
59  For this we take two globaltags: the running one and a staging one
60  containing all the payloads and iovs to be added to the running tag. We then
61 
62  1. Make sure they are in the correct states (RUNNING and VALIDATED)
63  2. Make sure all payloads in the running tag start and end (except for open
64  iovs) before the given ``valid_from`` run
65  3. Make sure the staging tag is overlap and gap free
66  4. Make all payloads in staging start at either 0,0 or on/after the given
67  ``valid_from`` run
68  5. Make sure all payloads in staging are unbound unless the mode is
69  ``ALLOW_CLOSED`` or ``FIX_CLOSED``. In case of ``FIX_CLOSED`` extend the
70  last iov to infinity
71  6. Close all payloads to be updated in the running tag that are open just
72  before the validity in the staging tag.
73  """
74  def __init__(self, db, running, staging, valid_from, mode, dry_run=False):
75  """Initialize the class
76 
77  Arguments:
78  db (ConditionsDB): reference to the database object
79  running (str): name of the running tag
80  stagin (str): name of the staging tag
81  valid_from (tuple(int,int)): first valid exp,run
82  mode (RunningTagUpdateMode): the mode of the update
83  dry_run (bool): If true only check, don't do anything.
84  But be more lenient with globaltag state of staging.
85  """
86 
87  self._db_db = db
88 
89  # make sure the valid_from is a tuple of two ints
90  try:
91  valid_from = tuple(map(int, valid_from))
92  if len(valid_from) != 2:
93  raise ValueError("exp,run number needs to have two elements")
94  except Exception as e:
95  raise RunningTagUpdaterError("No first valid run for the update specified", error=str(e)) from e
96 
97 
99  self._dry_run_dry_run = dry_run
100 
101  self._valid_from_valid_from = valid_from
102 
103 
105  self._mode_mode = mode
106 
107  self._allow_closed_allow_closed = mode in (RunningTagUpdateMode.ALLOW_CLOSED, RunningTagUpdateMode.FIX_CLOSED,
108  RunningTagUpdateMode.FULL_REPLACEMENT)
109 
110  self._fix_closed_fix_closed = (mode == RunningTagUpdateMode.FIX_CLOSED)
111 
113  self._staging_coverage_staging_coverage = None
114 
115  self._operations_operations = None
116 
117 
118  self._running_info_running_info = db.get_globalTagInfo(running)
119 
120  self._staging_info_staging_info = db.get_globalTagInfo(staging)
121  # make sure the tags can be found and have the correct state
122  self._check_state_check_state(running, self._running_info_running_info, "RUNNING")
123  self._check_state_check_state(staging, self._staging_info_staging_info, "VALIDATED")
124 
125  # Get the actual payloads
126 
127  self._running_payloads_running_payloads = db.get_all_iovs(self._running_info_running_info['name'])
128 
129  self._staging_payloads_staging_payloads = db.get_all_iovs(self._staging_info_staging_info['name'])
130 
131  self._staging_first_iovs_staging_first_iovs = {}
132 
133  def _check_state(self, tagname, taginfo, required):
134  """Check the state of a globaltag given the tag information object returned by the database
135 
136  1) that it's found and
137  2) that it has the same state as in ``required``
138 
139  Parameters:
140  tagname: name of the tag for error messages
141  taginfo: tag information returned from the database, None if the tag could not be found
142  required: required state for the tag.
143 
144  Raises:
145  an `RunningTagUpdaterError` if any condition is not fulfilled
146  """
147  if taginfo is None:
148  raise RunningTagUpdaterError(f"Globaltag '{tagname}' cannot be found")
149  state = taginfo['globalTagStatus']['name'].upper()
150  if state != required.upper():
151  if self._dry_run_dry_run:
152  B2WARNING(f"Globaltag '{tagname}' not in {required.upper()} state, continuing to display changes")
153  return
154  raise RunningTagUpdaterError(f"Globaltag '{tagname}' not in {required.upper()} state", state=state)
155 
156  def _check_all(self):
157  """Run all necessary checks on all globaltags"""
158  # And check both tags
159  self._check_running_tag_check_running_tag(self._running_info_running_info['name'], self._running_payloads_running_payloads)
160  # and check the staging tag
161  if self._mode_mode == RunningTagUpdateMode.SIMPLE:
162  # the simple mode is so much simpler that a different method is best
163  self._check_staging_tag_simple_check_staging_tag_simple(self._staging_info_staging_info['name'], self._staging_payloads_staging_payloads)
164  else:
165  # all other modes are covered here
166  self._check_staging_tag_check_staging_tag(self._staging_info_staging_info['name'], self._staging_payloads_staging_payloads)
167 
168  def _check_running_tag(self, tagname, payloads):
169  """
170  Check that all payloads in the running tag start and end (or are open)
171  before the first valid run for the update
172  """
173  errors = {
174  "payloads start after first valid run": 0,
175  "payloads end after first valid run": 0
176  }
177  earliest_valid_from = (0, 0)
178  for p in payloads:
179  iov = IntervalOfValidity(p.iov)
180  # starting of a validity is simple ... it needs to be below the first valid run
181  if iov.first >= self._valid_from_valid_from:
182  B2ERROR(f"Payload in running tag '{tagname}' starts after first valid run",
183  payload=p.name, iov=p.iov, **{"first valid run": self._valid_from_valid_from})
184  errors["payloads start after first valid run"] += 1
185  # end of a validity only matters for closed iovs, open iovs can get clipped
186  elif iov.final != IntervalOfValidity.always().final and iov.final >= self._valid_from_valid_from:
187  B2ERROR(f"Payload in running tag '{tagname}' ends after first valid run",
188  payload=p.name, iov=p.iov, **{"first valid run": self._valid_from_valid_from})
189  errors["payloads end after first valid run"] += 1
190 
191  earliest_valid_from = max(earliest_valid_from, iov.first)
192  if iov.final != IntervalOfValidity.always().final:
193  earliest_valid_from = max(earliest_valid_from, iov.final)
194 
195  if self._dry_run_dry_run:
196  B2INFO("Earliest possible update of the running tag would be exp "
197  f"{earliest_valid_from[0]}, run {earliest_valid_from[1] + 1}")
198 
199  # show errors if we have any ...
200  if any(errors.values()):
201  raise RunningTagUpdaterError("Given first valid run conflicts with "
202  f"running tag '{tagname}'", **errors)
203 
204  def _check_staging_tag_simple(self, tagname, payloads):
205  """
206  Extra simple case where we want to have a very simple staging tag just
207  consisting of (0,0,-1,-1) iovs, one per payload
208  """
209  # This is the easy case: Make sure **ALL** iovs are the same and we only
210  # have one per payload name.
211  payload_names = set()
212  errors = {"duplicate payloads": 0, "wrong validity": 0}
213  for p in payloads:
214  if p.name in payload_names:
215  B2ERROR(f"Duplicate payload in staging tag '{tagname}'", name=p.name)
216  errors["duplicate payloads"] += 1
217  payload_names.add(p.name)
218  if p.iov != (0, 0, -1, -1):
219  errors["wrong validity"] += 1
220  B2ERROR(f"Wrong validity for payload in staging tag '{tagname}'", name=p.name, validity=p.iov)
221 
222  # trivial, only one payload per name to remember so it's always the first
223  self._staging_first_iovs_staging_first_iovs[p.name] = p
224 
225  # how many errors did we have?
226  if any(errors.values()):
227  raise RunningTagUpdaterError(f"Staging tag '{tagname}' not fit for update in simple mode", **errors)
228 
229  # everything is fine, set the coverage to everything for all payloads ...
230  always = IoVSet([IntervalOfValidity.always()])
231  self._staging_coverage_staging_coverage = {name: always for name in payload_names}
232 
233  def _check_staging_tag(self, tagname, payloads):
234  """
235  Check if the staging tag is
236  1. overlap free
237  2. gap free
238  3. all payloads are open (unless the mode allows closed payloads)
239  4. all payloads start at 0,0 or after the first valid run for the update
240 
241  Arguments:
242  tagname (str): Name of the globaltag for error messages
243  payloads (list(conditions_db.PayloadInformation)): List of payloads in the tag
244  """
245  # coverage for all iovs for a payload name not starting at (0,0) as those
246  # will be adjusted to first valid run
247  explicit_coverage = defaultdict(IoVSet)
248  # full coverage of the payload to check for overlaps and gaps and later
249  # use it to correctly close open iovs in the running tag
250  full_coverage = defaultdict(IoVSet)
251  # if we want to open iovs we need to know which was the latest
252  latest_iov = {}
253  # error dictionary to return to caller
254  errors = {'overlaps': 0, 'gaps': 0, 'starts too early': 0, 'closed payloads': 0}
255  # go through all payloads and form the union of all their validities ...
256  # and check for overlaps
257  for p in payloads:
258  iov = IntervalOfValidity(p.iov)
259  try:
260  # add the iov to the
261  full_coverage[p.name].add(iov, allow_overlaps=False)
262  except ValueError as e:
263  B2ERROR(f"Overlap in globaltag '{tagname}'", payload=p.name, overlap=e)
264  errors['overlaps'] += 1
265  # now add it anyways to check for more overlaps with the other
266  # iovs in the tag just to give a complete list of errors
267  full_coverage[p.name].add(iov, allow_overlaps=True)
268 
269  # add all iovs not starting at 0,0 to the explicit coverage of this payload
270  if iov.first != (0, 0):
271  explicit_coverage[p.name].add(iov, allow_overlaps=True)
272 
273  # do we need to open iovs? If so remember the latest iov for each payload
274  if self._fix_closed_fix_closed:
275  prev = latest_iov.get(p.name, None)
276  if prev is None:
277  latest_iov[p.name] = p
278  else:
279  latest_iov[p.name] = max(p, prev)
280 
281  # remember the first iov of each payload to extend what's in running if they match
282  first = self._staging_first_iovs_staging_first_iovs.get(p.name, None)
283  if first is None:
284  self._staging_first_iovs_staging_first_iovs[p.name] = p
285  else:
286  self._staging_first_iovs_staging_first_iovs[p.name] = min(p, first)
287 
288  # Ok, now check for all payloads if the resulting iov is a single one or multiple,
289  # aka having gaps. In that case print the gaps
290  for name, iovs in full_coverage.items():
291  if len(iovs) > 1:
292  B2ERROR(f"Gap in globaltag '{tagname}'", payload=name, gaps=iovs.gaps)
293  errors['gaps'] += len(iovs) - 1
294  # Also, make sure the iovs for the payloads are infinite and either
295  # raise errors or at least show a warning
296  if iovs.final != IntervalOfValidity.always().final:
297  log_func = B2WARNING
298  if not self._allow_closed_allow_closed:
299  log_func = B2ERROR
300  errors['closed payloads'] += 1
301  log_func(f"Payload in globaltag '{tagname}' not open ended",
302  payload=name, **{"final run": iovs.final})
303 
304  # No gaps, no overlaps, but do we start at the given valid_from run?
305  for name, iovs in explicit_coverage.items():
306  if iovs.first < self._valid_from_valid_from:
307  B2ERROR(f"Payload in globaltag '{tagname}' starts before the given first valid run",
308  payload=name, **{"actual start validity": iovs.first,
309  "expected start validity": self._valid_from_valid_from})
310  errors['starts too early'] += 1
311 
312  # Do we want to open iovs?
313  if self._fix_closed_fix_closed:
314  # Then do so ...
315  for payload in latest_iov.values():
316  if payload.iov[2:] != (-1, -1):
317  B2INFO("Extending closed iov to infinity", name=payload.name,
318  **{"old iov": payload.iov})
319  payload.iov = payload.iov[:2] + (-1, -1)
320  full_coverage[payload.name].add(payload.iov, allow_overlaps=True)
321 
322  # Any errors?
323  if any(errors.values()):
324  raise RunningTagUpdaterError(f"Staging tag '{tagname}' not fit for update", **errors)
325 
326  # No errors, great, remember the coverages for all payloads to select the
327  # proper payloads to be closed in the running tag and where to close them
328  self._staging_coverage_staging_coverage = dict(full_coverage)
329 
330  def calculate_update(self):
331  """
332  Calculate the operations needed to merge staging into the running base tag
333  """
334  # Check all tags and payloads
335  self._check_all_check_all()
336  # Ok, all checks done ...
337  valid_range = IntervalOfValidity(*(self._valid_from_valid_from + (-1, -1)))
338  operations = []
339  # we only need to close payloads that are present in staging ... unless
340  # we run in FULL mode in which we close everything
341  for p in self._running_payloads_running_payloads:
342  # so check if we have a coverage in the staging tag
343  staging = self._staging_coverage_staging_coverage.get(p.name, None)
344  if p.iov[2:] == (-1, -1):
345  if staging is None:
346  # payload not present in staging tag. Ignore unless we do full replacement
347  if not self._mode_mode == RunningTagUpdateMode.FULL_REPLACEMENT:
348  continue
349  staging_range = valid_range
350  else:
351  # extend the staging to infinity to make sure we close existing payloads correctly
352  staging_range = IntervalOfValidity(*(staging.first + (-1, -1))) & valid_range
353 
354  # if the first payload didn't change revision we don't need to make
355  # a new iov but can just extend the existing one
356  first_iov = self._staging_first_iovs_staging_first_iovs.get(p.name, None)
357  if first_iov is not None and first_iov.revision == p.revision:
358  staging_range -= IntervalOfValidity(first_iov.iov)
359  self._staging_payloads_staging_payloads.remove(first_iov)
360  # there's a chance this is empty now
361  if not staging_range:
362  continue
363  # close the existing iov before the range covered in staging
364  p.iov = (IntervalOfValidity(p.iov) - staging_range).tuple
365  # and mark the iov for closing
366  operations.append(["CLOSE", p])
367  # and all payloads that need adjusting in staging
368  for p in self._staging_payloads_staging_payloads:
369  # clip them to the valid range ... which only affects iovs starting
370  # from 0,0. Everything else would have raised an error
371  p.iov = (IntervalOfValidity(p.iov) & valid_range).tuple
372  # and add them to the list
373  operations.append(["CREATE", p])
374 
375  # remember the operations in case we want to apply them
376  self._operations_operations = operations
377  return self._operations_operations
378 
379  def apply_update(self):
380  """Apply a previously calculated update to the globaltag
381 
382  Warning:
383  This action cannot be undone, only call it after checking the
384  operations returned by the calculation of the update
385  """
386  if self._dry_run_dry_run:
387  raise RunningTagUpdaterError("Called in dry-run mode, refusing to cooperate")
388 
389  if self._operations_operations is None:
390  raise RunningTagUpdaterError("Update needs to be calculated first")
391 
392  operations = []
393  for op, payload in self._operations_operations:
394  if op == "CLOSE":
395  operations.append({"operation": "MODIFY", "data": [payload.iov_id] + list(payload.iov[2:])})
396  elif op == "CREATE":
397  operations.append({"operation": "CREATE", "data": [payload.payload_id] + list(payload.iov)})
398  else:
399  raise RunningTagUpdaterError(f"Unknown operation type: {op}")
400 
401  if not operations:
402  return
403 
404  tag = self._running_info_running_info['name']
405  try:
406  self._db_db.request("POST", f"/globalTagPayload/{tag}/updateRunningPayloads",
407  f"updating running tag {tag}", json=operations)
408  except ConditionsDB.RequestError as e:
409  raise RunningTagUpdaterError(f"Cannot update running tag {tag}", error=e) from e
extra_vars
extra keyword arguments given to the exception constructor
def __init__(self, description, **extra_vars)
Initialize the class.
_running_payloads
Payloads currently in the running tag.
def _check_staging_tag(self, tagname, payloads)
_valid_from
First valid run for the update.
_mode
True if we want to allow payloads in the staging tag to be closed, for example when retireing a paylo...
_staging_payloads
Payloads currently in the staging tag.
_staging_info
Globaltag information for the staging tag.
def _check_running_tag(self, tagname, payloads)
_staging_coverage
Dictionary mapping payload names in the staging tag to the coverage they have in the staging tag,...
_staging_first_iovs
First iov per payload name in staging to not close and open the same revision.
_allow_closed
Do we allow closed iovs in staging?
_running_info
Globaltag information for the running tag.
_fix_closed
Do we want to automatically open closed iovs?
def __init__(self, db, running, staging, valid_from, mode, dry_run=False)
def _check_state(self, tagname, taginfo, required)
_operations
Operations for the update, filled by calculate_update()
_dry_run
If we're in dry run mode be less critical about globaltag states (just show warnings) but refuse to d...
def _check_staging_tag_simple(self, tagname, payloads)
_db
Reference to the database object to use for queries.