Belle II Software  release-08-01-10
runningupdate.py
1 #!/usr/bin/env python3
2 
3 
10 
11 """
12 This module contains some classes to check the possibility and calculate the
13 necessary updates for a running globaltag
14 """
15 
16 from enum import Enum
17 from collections import defaultdict
18 from basf2 import B2ERROR, B2WARNING, B2INFO # noqa
19 from conditions_db import ConditionsDB
20 from conditions_db.iov import IntervalOfValidity, IoVSet
21 
22 
24  """Define the different modes for the update of a running tag"""
25 
28  STRICT = 1
29 
30  ALLOW_CLOSED = 2
31 
32  FIX_CLOSED = 3
33 
35  SIMPLE = 4
36 
40  FULL_REPLACEMENT = 6
41 
42 
43 class RunningTagUpdaterError(Exception):
44  """
45  Errors raised when trying to update the running globaltag. Can have extra
46  variables in `extra_vars` to be shown to the user for additional information
47  """
48 
49 
50  def __init__(self, description, **extra_vars):
51  super().__init__(description)
52 
53  self.extra_varsextra_vars = extra_vars
54 
55 
57  """
58  Calculate and apply the necessary changes to update a running globaltag
59 
60  For this we take two globaltags: the running one and a staging one
61  containing all the payloads and iovs to be added to the running tag. We then
62 
63  1. Make sure they are in the correct states (RUNNING and VALIDATED)
64  2. Make sure all payloads in the running tag start and end (except for open
65  iovs) before the given ``valid_from`` run
66  3. Make sure the staging tag is overlap and gap free
67  4. Make all payloads in staging start at either 0,0 or on/after the given
68  ``valid_from`` run
69  5. Make sure all payloads in staging are unbound unless the mode is
70  ``ALLOW_CLOSED`` or ``FIX_CLOSED``. In case of ``FIX_CLOSED`` extend the
71  last iov to infinity
72  6. Close all payloads to be updated in the running tag that are open just
73  before the validity in the staging tag.
74  """
75 
76  def __init__(self, db, running, staging, valid_from, mode, dry_run=False):
77  """Initialize the class
78 
79  Arguments:
80  db (ConditionsDB): reference to the database object
81  running (str): name of the running tag
82  staging (str): name of the staging tag
83  valid_from (tuple(int,int)): first valid exp,run
84  mode (RunningTagUpdateMode): the mode of the update
85  dry_run (bool): If true only check, don't do anything.
86  But be more lenient with globaltag state of staging.
87  """
88 
89  self._db_db = db
90 
91  # make sure the valid_from is a tuple of two ints
92  try:
93  valid_from = tuple(map(int, valid_from))
94  if len(valid_from) != 2:
95  raise ValueError("exp,run number needs to have two elements")
96  except Exception as e:
97  raise RunningTagUpdaterError("No first valid run for the update specified", error=str(e)) from e
98 
99 
101  self._dry_run_dry_run = dry_run
102 
103  self._valid_from_valid_from = valid_from
104 
105 
107  self._mode_mode = mode
108 
109  self._allow_closed_allow_closed = mode in (RunningTagUpdateMode.ALLOW_CLOSED, RunningTagUpdateMode.FIX_CLOSED,
110  RunningTagUpdateMode.FULL_REPLACEMENT)
111 
112  self._fix_closed_fix_closed = (mode == RunningTagUpdateMode.FIX_CLOSED)
113 
115  self._staging_coverage_staging_coverage = None
116 
117  self._operations_operations = None
118 
119 
120  self._running_info_running_info = db.get_globalTagInfo(running)
121 
122  self._staging_info_staging_info = db.get_globalTagInfo(staging)
123  # make sure the tags can be found and have the correct state
124  self._check_state_check_state(running, self._running_info_running_info, "RUNNING")
125  self._check_state_check_state(staging, self._staging_info_staging_info, "VALIDATED")
126 
127  # Get the actual payloads
128 
129  self._running_payloads_running_payloads = db.get_all_iovs(self._running_info_running_info['name'])
130 
131  self._staging_payloads_staging_payloads = db.get_all_iovs(self._staging_info_staging_info['name'])
132 
133  self._staging_first_iovs_staging_first_iovs = {}
134 
135  def _check_state(self, tagname, taginfo, required):
136  """Check the state of a globaltag given the tag information object returned by the database
137 
138  1) that it's found and
139  2) that it has the same state as in ``required``
140 
141  Parameters:
142  tagname: name of the tag for error messages
143  taginfo: tag information returned from the database, None if the tag could not be found
144  required: required state for the tag.
145 
146  Raises:
147  an `RunningTagUpdaterError` if any condition is not fulfilled
148  """
149  if taginfo is None:
150  raise RunningTagUpdaterError(f"Globaltag '{tagname}' cannot be found")
151  state = taginfo['globalTagStatus']['name'].upper()
152  if state != required.upper():
153  if self._dry_run_dry_run:
154  B2WARNING(f"Globaltag '{tagname}' not in {required.upper()} state, continuing to display changes")
155  return
156  raise RunningTagUpdaterError(f"Globaltag '{tagname}' not in {required.upper()} state", state=state)
157 
158  def _check_all(self):
159  """Run all necessary checks on all globaltags"""
160  # And check both tags
161  self._check_running_tag_check_running_tag(self._running_info_running_info['name'], self._running_payloads_running_payloads)
162  # and check the staging tag
163  if self._mode_mode == RunningTagUpdateMode.SIMPLE:
164  # the simple mode is so much simpler that a different method is best
165  self._check_staging_tag_simple_check_staging_tag_simple(self._staging_info_staging_info['name'], self._staging_payloads_staging_payloads)
166  else:
167  # all other modes are covered here
168  self._check_staging_tag_check_staging_tag(self._staging_info_staging_info['name'], self._staging_payloads_staging_payloads)
169 
170  def _check_running_tag(self, tagname, payloads):
171  """
172  Check that all payloads in the running tag start and end (or are open)
173  before the first valid run for the update
174  """
175  errors = {
176  "payloads start after first valid run": 0,
177  "payloads end after first valid run": 0
178  }
179  earliest_valid_from = (0, 0)
180  for p in payloads:
181  iov = IntervalOfValidity(p.iov)
182  # starting of a validity is simple ... it needs to be below the first valid run
183  if iov.first >= self._valid_from_valid_from:
184  B2ERROR(f"Payload in running tag '{tagname}' starts after first valid run",
185  payload=p.name, iov=p.iov, **{"first valid run": self._valid_from_valid_from})
186  errors["payloads start after first valid run"] += 1
187  # end of a validity only matters for closed iovs, open iovs can get clipped
188  elif iov.final != IntervalOfValidity.always().final and iov.final >= self._valid_from_valid_from:
189  B2ERROR(f"Payload in running tag '{tagname}' ends after first valid run",
190  payload=p.name, iov=p.iov, **{"first valid run": self._valid_from_valid_from})
191  errors["payloads end after first valid run"] += 1
192 
193  earliest_valid_from = max(earliest_valid_from, iov.first)
194  if iov.final != IntervalOfValidity.always().final:
195  earliest_valid_from = max(earliest_valid_from, iov.final)
196 
197  if self._dry_run_dry_run:
198  B2INFO("Earliest possible update of the running tag would be exp "
199  f"{earliest_valid_from[0]}, run {earliest_valid_from[1] + 1}")
200 
201  # show errors if we have any ...
202  if any(errors.values()):
203  raise RunningTagUpdaterError("Given first valid run conflicts with "
204  f"running tag '{tagname}'", **errors)
205 
206  def _check_staging_tag_simple(self, tagname, payloads):
207  """
208  Extra simple case where we want to have a very simple staging tag just
209  consisting of (0,0,-1,-1) iovs, one per payload
210  """
211  # This is the easy case: Make sure **ALL** iovs are the same and we only
212  # have one per payload name.
213  payload_names = set()
214  errors = {"duplicate payloads": 0, "wrong validity": 0}
215  for p in payloads:
216  if p.name in payload_names:
217  B2ERROR(f"Duplicate payload in staging tag '{tagname}'", name=p.name)
218  errors["duplicate payloads"] += 1
219  payload_names.add(p.name)
220  if p.iov != (0, 0, -1, -1):
221  errors["wrong validity"] += 1
222  B2ERROR(f"Wrong validity for payload in staging tag '{tagname}'", name=p.name, validity=p.iov)
223 
224  # trivial, only one payload per name to remember so it's always the first
225  self._staging_first_iovs_staging_first_iovs[p.name] = p
226 
227  # how many errors did we have?
228  if any(errors.values()):
229  raise RunningTagUpdaterError(f"Staging tag '{tagname}' not fit for update in simple mode", **errors)
230 
231  # everything is fine, set the coverage to everything for all payloads ...
232  always = IoVSet([IntervalOfValidity.always()])
233  self._staging_coverage_staging_coverage = {name: always for name in payload_names}
234 
235  def _check_staging_tag(self, tagname, payloads):
236  """
237  Check if the staging tag is
238  1. overlap free
239  2. gap free
240  3. all payloads are open (unless the mode allows closed payloads)
241  4. all payloads start at 0,0 or after the first valid run for the update
242 
243  Arguments:
244  tagname (str): Name of the globaltag for error messages
245  payloads (list(conditions_db.PayloadInformation)): List of payloads in the tag
246  """
247  # coverage for all iovs for a payload name not starting at (0,0) as those
248  # will be adjusted to first valid run
249  explicit_coverage = defaultdict(IoVSet)
250  # full coverage of the payload to check for overlaps and gaps and later
251  # use it to correctly close open iovs in the running tag
252  full_coverage = defaultdict(IoVSet)
253  # if we want to open iovs we need to know which was the latest
254  latest_iov = {}
255  # error dictionary to return to caller
256  errors = {'overlaps': 0, 'gaps': 0, 'starts too early': 0, 'closed payloads': 0}
257  # go through all payloads and form the union of all their validities ...
258  # and check for overlaps
259  for p in payloads:
260  iov = IntervalOfValidity(p.iov)
261  try:
262  # add the iov to the
263  full_coverage[p.name].add(iov, allow_overlaps=False)
264  except ValueError as e:
265  B2ERROR(f"Overlap in globaltag '{tagname}'", payload=p.name, overlap=e)
266  errors['overlaps'] += 1
267  # now add it anyways to check for more overlaps with the other
268  # iovs in the tag just to give a complete list of errors
269  full_coverage[p.name].add(iov, allow_overlaps=True)
270 
271  # add all iovs not starting at 0,0 to the explicit coverage of this payload
272  if iov.first != (0, 0):
273  explicit_coverage[p.name].add(iov, allow_overlaps=True)
274 
275  # do we need to open iovs? If so remember the latest iov for each payload
276  if self._fix_closed_fix_closed:
277  prev = latest_iov.get(p.name, None)
278  if prev is None:
279  latest_iov[p.name] = p
280  else:
281  latest_iov[p.name] = max(p, prev)
282 
283  # remember the first iov of each payload to extend what's in running if they match
284  first = self._staging_first_iovs_staging_first_iovs.get(p.name, None)
285  if first is None:
286  self._staging_first_iovs_staging_first_iovs[p.name] = p
287  else:
288  self._staging_first_iovs_staging_first_iovs[p.name] = min(p, first)
289 
290  # Ok, now check for all payloads if the resulting iov is a single one or multiple,
291  # aka having gaps. In that case print the gaps
292  for name, iovs in full_coverage.items():
293  if len(iovs) > 1:
294  B2ERROR(f"Gap in globaltag '{tagname}'", payload=name, gaps=iovs.gaps)
295  errors['gaps'] += len(iovs) - 1
296  # Also, make sure the iovs for the payloads are infinite and either
297  # raise errors or at least show a warning
298  if iovs.final != IntervalOfValidity.always().final:
299  log_func = B2WARNING
300  if not self._allow_closed_allow_closed:
301  log_func = B2ERROR
302  errors['closed payloads'] += 1
303  log_func(f"Payload in globaltag '{tagname}' not open ended",
304  payload=name, **{"final run": iovs.final})
305 
306  # No gaps, no overlaps, but do we start at the given valid_from run?
307  for name, iovs in explicit_coverage.items():
308  if iovs.first < self._valid_from_valid_from:
309  B2ERROR(f"Payload in globaltag '{tagname}' starts before the given first valid run",
310  payload=name, **{"actual start validity": iovs.first,
311  "expected start validity": self._valid_from_valid_from})
312  errors['starts too early'] += 1
313 
314  # Do we want to open iovs?
315  if self._fix_closed_fix_closed:
316  # Then do so ...
317  for payload in latest_iov.values():
318  if payload.iov[2:] != (-1, -1):
319  B2INFO("Extending closed iov to infinity", name=payload.name,
320  **{"old iov": payload.iov})
321  payload.iov = payload.iov[:2] + (-1, -1)
322  full_coverage[payload.name].add(payload.iov, allow_overlaps=True)
323 
324  # Any errors?
325  if any(errors.values()):
326  raise RunningTagUpdaterError(f"Staging tag '{tagname}' not fit for update", **errors)
327 
328  # No errors, great, remember the coverages for all payloads to select the
329  # proper payloads to be closed in the running tag and where to close them
330  self._staging_coverage_staging_coverage = dict(full_coverage)
331 
332  def calculate_update(self):
333  """
334  Calculate the operations needed to merge staging into the running base tag
335  """
336  # Check all tags and payloads
337  self._check_all_check_all()
338  # Ok, all checks done ...
339  valid_range = IntervalOfValidity(*(self._valid_from_valid_from + (-1, -1)))
340  operations = []
341  # we only need to close payloads that are present in staging ... unless
342  # we run in FULL mode in which we close everything
343  for p in self._running_payloads_running_payloads:
344  # so check if we have a coverage in the staging tag
345  staging = self._staging_coverage_staging_coverage.get(p.name, None)
346  if p.iov[2:] == (-1, -1):
347  if staging is None:
348  # payload not present in staging tag. Ignore unless we do full replacement
349  if not self._mode_mode == RunningTagUpdateMode.FULL_REPLACEMENT:
350  continue
351  staging_range = valid_range
352  else:
353  # extend the staging to infinity to make sure we close existing payloads correctly
354  staging_range = IntervalOfValidity(*(staging.first + (-1, -1))) & valid_range
355 
356  # if the first payload didn't change revision we don't need to make
357  # a new iov but can just extend the existing one
358  first_iov = self._staging_first_iovs_staging_first_iovs.get(p.name, None)
359  if first_iov is not None and first_iov.revision == p.revision:
360  staging_range -= IntervalOfValidity(first_iov.iov)
361  self._staging_payloads_staging_payloads.remove(first_iov)
362  # there's a chance this is empty now
363  if not staging_range:
364  continue
365  # close the existing iov before the range covered in staging
366  p.iov = (IntervalOfValidity(p.iov) - staging_range).tuple
367  # and mark the iov for closing
368  operations.append(["CLOSE", p])
369  # and all payloads that need adjusting in staging
370  for p in self._staging_payloads_staging_payloads:
371  # clip them to the valid range ... which only affects iovs starting
372  # from 0,0. Everything else would have raised an error
373  p.iov = (IntervalOfValidity(p.iov) & valid_range).tuple
374  # and add them to the list
375  operations.append(["CREATE", p])
376 
377  # remember the operations in case we want to apply them
378  self._operations_operations = operations
379  return self._operations_operations
380 
381  def apply_update(self):
382  """Apply a previously calculated update to the globaltag
383 
384  Warning:
385  This action cannot be undone, only call it after checking the
386  operations returned by the calculation of the update
387  """
388  if self._dry_run_dry_run:
389  raise RunningTagUpdaterError("Called in dry-run mode, refusing to cooperate")
390 
391  if self._operations_operations is None:
392  raise RunningTagUpdaterError("Update needs to be calculated first")
393 
394  operations = []
395  for op, payload in self._operations_operations:
396  if op == "CLOSE":
397  operations.append({"operation": "MODIFY", "data": [payload.iov_id] + list(payload.iov[2:])})
398  elif op == "CREATE":
399  operations.append({"operation": "CREATE", "data": [payload.payload_id] + list(payload.iov)})
400  else:
401  raise RunningTagUpdaterError(f"Unknown operation type: {op}")
402 
403  if not operations:
404  return
405 
406  tag = self._running_info_running_info['name']
407  try:
408  self._db_db.request("POST", f"/globalTagPayload/{tag}/updateRunningPayloads",
409  f"updating running tag {tag}", json=operations)
410  except ConditionsDB.RequestError as e:
411  raise RunningTagUpdaterError(f"Cannot update running tag {tag}", error=e) from e
extra_vars
extra keyword arguments given to the exception constructor
def __init__(self, description, **extra_vars)
Initialize the class.
_running_payloads
Payloads currently in the running tag.
def _check_staging_tag(self, tagname, payloads)
_valid_from
First valid run for the update.
_mode
True if we want to allow payloads in the staging tag to be closed, for example when retiring a payloa...
_staging_payloads
Payloads currently in the staging tag.
_staging_info
Globaltag information for the staging tag.
def _check_running_tag(self, tagname, payloads)
_staging_coverage
Dictionary mapping payload names in the staging tag to the coverage they have in the staging tag,...
_staging_first_iovs
First iov per payload name in staging to not close and open the same revision.
_allow_closed
Do we allow closed iovs in staging?
_running_info
Globaltag information for the running tag.
_fix_closed
Do we want to automatically open closed iovs?
def __init__(self, db, running, staging, valid_from, mode, dry_run=False)
def _check_state(self, tagname, taginfo, required)
_operations
Operations for the update, filled by calculate_update()
_dry_run
If we're in dry run mode be less critical about globaltag states (just show warnings) but refuse to d...
def _check_staging_tag_simple(self, tagname, payloads)
_db
Reference to the database object to use for queries.