Belle II Software  release-05-01-25
runningupdate.py
1 #!/usr/bin/env python3
2 
3 """
4 This module contains some classes to check the possibility and calculate the
5 necessary updates for a running globaltag
6 """
7 
8 from enum import Enum
9 from collections import defaultdict
10 from basf2 import B2ERROR, B2WARNING, B2INFO # noqa
11 from . import ConditionsDB
12 from .iov import IntervalOfValidity, IoVSet
13 
14 
16  """Define the different modes for the update of a running tag"""
17 
20  STRICT = 1
21 
22  ALLOW_CLOSED = 2
23 
24  FIX_CLOSED = 3
25 
27  SIMPLE = 4
28 
32  FULL_REPLACEMENT = 6
33 
34 
35 class RunningTagUpdaterError(Exception):
36  """
37  Errors raised when trying to update the running globaltag. Can have extra
38  variables in `extra_vars` to be shown to the user for additional information
39  """
40 
41  def __init__(self, description, **extra_vars):
42  super().__init__(description)
43 
44  self.extra_vars = extra_vars
45 
46 
48  """
49  Calculate and apply the necessary changes to update a running globaltag
50 
51  For this we take two globaltags: the running one and a staging one
52  containing all the payloads and iovs to be added to the running tag. We then
53 
54  1. Make sure they are in the correct states (RUNNING and VALIDATED)
55  2. Make sure all payloads in the running tag start and end (except for open
56  iovs) before the given ``valid_from`` run
57  3. Make sure the staging tag is overlap and gap free
58  4. Make all payloads in staging start at either 0,0 or on/after the given
59  ``valid_from`` run
60  5. Make sure all payloads in staging are unbound unless the mode is
61  ``ALLOW_CLOSED`` or ``FIX_CLOSED``. In case of ``FIX_CLOSED`` extend the
62  last iov to infinity
63  6. Close all payloads to be updated in the running tag that are open just
64  before the validity in the staging tag.
65  """
66 
67  def __init__(self, db, running, staging, valid_from, mode):
68 
69  self._db = db
70 
71  # make sure the valid_from is a tuple of two ints
72  try:
73  valid_from = tuple(map(int, valid_from))
74  if len(valid_from) != 2:
75  raise ValueError("exp,run number needs to have two elements")
76  except Exception as e:
77  raise RunningTagUpdaterError("No first valid run for the update specified", error=str(e)) from e
78 
79 
80  self._valid_from = valid_from
81 
82 
84  self._mode = mode
85 
86  self._allow_closed = mode in (RunningTagUpdateMode.ALLOW_CLOSED, RunningTagUpdateMode.FIX_CLOSED,
87  RunningTagUpdateMode.FULL_REPLACEMENT)
88 
89  self._fix_closed = (mode == RunningTagUpdateMode.FIX_CLOSED)
90 
92  self._staging_coverage = None
93 
94  self._operations = None
95 
96 
97  self._running_info = db.get_globalTagInfo(running)
98 
99  self._staging_info = db.get_globalTagInfo(staging)
100  # make sure the tags can be found and have the correct state
101  self._check_state(running, self._running_info, "RUNNING")
102  self._check_state(staging, self._staging_info, "VALIDATED")
103 
104  # Get the actual payloads
105 
106  self._running_payloads = db.get_all_iovs(self._running_info['name'])
107 
108  self._staging_payloads = db.get_all_iovs(self._staging_info['name'])
109 
111 
112  def _check_state(self, tagname, taginfo, required):
113  """Check the state of a globaltag given the tag information object returned by the database
114 
115  1) that it's found and
116  2) that it has the same state as in ``required``
117 
118  Parameters:
119  tagname: name of the tag for error messages
120  taginfo: tag information returned from the database, None if the tag could not be found
121  required: required state for the tag.
122 
123  Raises:
124  an `RunningTagUpdaterError` if any condition is not fulfilled
125  """
126  if taginfo is None:
127  raise RunningTagUpdaterError(f"Globaltag '{tagname}' cannot be found")
128  state = taginfo['globalTagStatus']['name'].upper()
129  if state != required.upper():
130  raise RunningTagUpdaterError(f"Globaltag '{tagname}' not in {required.upper()} state", state=state)
131 
132  def _check_all(self):
133  """Run all necessary checks on all globaltags"""
134  # And check both tags
135  self._check_running_tag(self._running_info['name'], self._running_payloads)
136  # and check the staging tag
137  if self._mode == RunningTagUpdateMode.SIMPLE:
138  # the simple mode is so much simpler that a different method is best
140  else:
141  # all other modes are covered here
142  self._check_staging_tag(self._staging_info['name'], self._staging_payloads)
143 
144  def _check_running_tag(self, tagname, payloads):
145  """
146  Check that all payloads in the running tag start and end (or are open)
147  before the first valid run for the update
148  """
149  errors = {
150  "payloads start after first valid run": 0,
151  "payloads end after first valid run": 0
152  }
153  for p in payloads:
154  iov = IntervalOfValidity(p.iov)
155  # starting of a validity is simple ... it needs to be below the first valid run
156  if iov.first >= self._valid_from:
157  B2ERROR(f"Payload in running tag '{tagname}' starts after first valid run",
158  payload=p.name, iov=p.iov, **{"first valid run": self._valid_from})
159  errors["payloads start after first valid run"] += 1
160  # end of a validity only matters for closed iovs, open iovs can get clipped
161  elif iov.final != IntervalOfValidity.always().final and iov.final >= self._valid_from:
162  B2ERROR(f"Payload in running tag '{tagname}' ends after first valid run",
163  payload=p.name, iov=p.iov, **{"first valid run": self._valid_from})
164  errors["payloads end after first valid run"] += 1
165 
166  # show errors if we have any ...
167  if any(errors.values()):
168  raise RunningTagUpdaterError("Given first valid run conflicts with "
169  f"running tag '{tagname}'", **errors)
170 
171  def _check_staging_tag_simple(self, tagname, payloads):
172  """
173  Extra simple case where we want to have a very simple staging tag just
174  consisting of (0,0,-1,-1) iovs, one per payload
175  """
176  # This is the easy case: Make sure **ALL** iovs are the same and we only
177  # have one per payload name.
178  payload_names = set()
179  errors = {"duplicate payloads": 0, "wrong validity": 0}
180  for p in payloads:
181  if p.name in payload_names:
182  B2ERROR(f"Duplicate payload in staging tag '{tagname}'", name=p.name)
183  errors["duplicate payloads"] += 1
184  payload_names.add(p.name)
185  if p.iov != (0, 0, -1, -1):
186  errors["wrong validity"] += 1
187  B2ERROR(f"Wrong validity for payload in staging tag '{tagname}'", name=p.name, validity=p.iov)
188 
189  # trivial, only one payload per name to remember so it's always the first
190  self._staging_first_iovs[p.name] = p
191 
192  # how many errors did we have?
193  if any(errors.values()):
194  raise RunningTagUpdaterError(f"Staging tag '{tagname}' not fit for update in simple mode", **errors)
195 
196  # everything is fine, set the coverage to everything for all payloads ...
197  always = IoVSet([IntervalOfValidity.always()])
198  self._staging_coverage = {name: always for name in payload_names}
199 
200  def _check_staging_tag(self, tagname, payloads):
201  """
202  Check if the staging tag is
203  1. overlap free
204  2. gap free
205  3. all payloads are open (unless the mode allows closed payloads)
206  4. all payloads start at 0,0 or after the first valid run for the update
207 
208  Arguments:
209  tagname (str): Name of the globaltag for error messages
210  payloads (list(conditions_db.PayloadInformation)): List of payloads in the tag
211  """
212  # coverage for all iovs for a payload name not starting at (0,0) as those
213  # will be adjusted to first valid run
214  explicit_coverage = defaultdict(IoVSet)
215  # full coverage of the payload to check for overlaps and gaps and later
216  # use it to correctly close open iovs in the running tag
217  full_coverage = defaultdict(IoVSet)
218  # if we want to open iovs we need to know which was the latest
219  latest_iov = {}
220  # error dictionary to return to caller
221  errors = {'overlaps': 0, 'gaps': 0, 'starts too early': 0, 'closed payloads': 0}
222  # go through all payloads and form the union of all their validities ...
223  # and check for overlaps
224  for p in payloads:
225  iov = IntervalOfValidity(p.iov)
226  try:
227  # add the iov to the
228  full_coverage[p.name].add(iov, allow_overlaps=False)
229  except ValueError as e:
230  B2ERROR(f"Overlap in globaltag '{tagname}'", payload=p.name, overlap=e)
231  errors['overlaps'] += 1
232  # now add it anyways to check for more overlaps with the other
233  # iovs in the tag just to give a complete list of errors
234  full_coverage[p.name].add(iov, allow_overlaps=True)
235 
236  # add all iovs not starting at 0,0 to the explicit coverage of this payload
237  if iov.first != (0, 0):
238  explicit_coverage[p.name].add(iov, allow_overlaps=True)
239 
240  # do we need to open iovs? If so remember the latest iov for each payload
241  if self._fix_closed:
242  prev = latest_iov.get(p.name, None)
243  if prev is None:
244  latest_iov[p.name] = p
245  else:
246  latest_iov[p.name] = max(p, prev)
247 
248  # remember the first iov of each payload to extend what's in running if they match
249  first = self._staging_first_iovs.get(p.name, None)
250  if first is None:
251  self._staging_first_iovs[p.name] = p
252  else:
253  self._staging_first_iovs[p.name] = min(p, first)
254 
255  # Ok, now check for all payloads if the resulting iov is a single one or multiple,
256  # aka having gaps. In that case print the gaps
257  for name, iovs in full_coverage.items():
258  if len(iovs) > 1:
259  B2ERROR(f"Gap in globaltag '{tagname}'", payload=name, gaps=iovs.gaps)
260  errors['gaps'] += len(iovs) - 1
261  # Also, make sure the iovs for the payloads are infinite and either
262  # raise errors or at least show a warning
263  if iovs.final != IntervalOfValidity.always().final:
264  log_func = B2WARNING
265  if not self._allow_closed:
266  log_func = B2ERROR
267  errors['closed payloads'] += 1
268  log_func(f"Payload in globaltag '{tagname}' not open ended",
269  payload=name, **{"final run": iovs.final})
270 
271  # No gaps, no overlaps, but do we start at the given valid_from run?
272  for name, iovs in explicit_coverage.items():
273  if iovs.first < self._valid_from:
274  B2ERROR(f"Payload in globaltag '{tagname}' starts before the given first valid run",
275  payload=name, **{"actual start validity": iovs.first,
276  "expected start validity": self._valid_from})
277  errors['starts too early'] += 1
278 
279  # Do we want to open iovs?
280  if self._fix_closed:
281  # Then do so ...
282  for payload in latest_iov.values():
283  if payload.iov[2:] != (-1, -1):
284  B2INFO("Extending closed iov to infinity", name=payload.name,
285  **{"old iov": payload.iov})
286  payload.iov = payload.iov[:2] + (-1, -1)
287  full_coverage[payload.name].add(payload.iov, allow_overlaps=True)
288 
289  # Any errors?
290  if any(errors.values()):
291  raise RunningTagUpdaterError(f"Staging tag '{tagname}' not fit for update", **errors)
292 
293  # No errors, great, remember the coverages for all payloads to select the
294  # proper payloads to be closed in the running tag and where to close them
295  self._staging_coverage = dict(full_coverage)
296 
297  def calculate_update(self):
298  """
299  Calculate the operations needed to merge staging into the running base tag
300  """
301  # Check all tags and payloads
302  self._check_all()
303  # Ok, all checks done ...
304  valid_range = IntervalOfValidity(*(self._valid_from + (-1, -1)))
305  operations = []
306  # we only need to close payloads that are present in staging ... unless
307  # we run in FULL mode in which we close everything
308  for p in self._running_payloads:
309  # so check if we have a coverage in the staging tag
310  staging = self._staging_coverage.get(p.name, None)
311  if p.iov[2:] == (-1, -1):
312  if staging is None:
313  # payload not present in staging tag. Ignore unless we do full replacement
314  if not self._mode == RunningTagUpdateMode.FULL_REPLACEMENT:
315  continue
316  staging_range = valid_range
317  else:
318  # extend the staging to infinity to make sure we close existing payloads correctly
319  staging_range = IntervalOfValidity(*(staging.first + (-1, -1))) & valid_range
320 
321  # if the first payload didn't change revision we don't need to make
322  # a new iov but can just extend the existing one
323  first_iov = self._staging_first_iovs.get(p.name, None)
324  if first_iov is not None and first_iov.revision == p.revision:
325  staging_range -= IntervalOfValidity(first_iov.iov)
326  self._staging_payloads.remove(first_iov)
327  # there's a chance this is empty now
328  if not staging_range:
329  continue
330  # close the existing iov before the range covered in staging
331  p.iov = (IntervalOfValidity(p.iov) - staging_range).tuple
332  # and mark the iov for closing
333  operations.append(["CLOSE", p])
334  # and all payloads that need adjusting in staging
335  for p in self._staging_payloads:
336  # clip them to the valid range ... which only affects iovs starting
337  # from 0,0. Everything else would have raised an error
338  p.iov = (IntervalOfValidity(p.iov) & valid_range).tuple
339  # and add them to the list
340  operations.append(["CREATE", p])
341 
342  # remember the operations in case we want to apply them
343  self._operations = operations
344  return self._operations
345 
346  def apply_update(self):
347  """Apply a previously calculated update to the globaltag
348 
349  Warning:
350  This action cannot be undone, only call it after checking the
351  operations returned by the calculation of the update
352  """
353  if self._operations is None:
354  raise RunningTagUpdaterError("Update needs to be calculated first")
355 
356  operations = []
357  for op, payload in self._operations:
358  if op == "CLOSE":
359  operations.append({"operation": "MODIFY", "data": [payload.iov_id] + list(payload.iov[2:])})
360  elif op == "CREATE":
361  operations.append({"operation": "CREATE", "data": [payload.payload_id] + list(payload.iov)})
362  else:
363  raise RunningTagUpdaterError(f"Unknown operation type: {op}")
364 
365  if not operations:
366  return
367 
368  tag = self._running_info['name']
369  try:
370  self._db.request("POST", f"/globalTagPayload/{tag}/updateRunningPayloads",
371  f"updating running tag {tag}", json=operations)
372  except ConditionsDB.RequestError as e:
373  raise RunningTagUpdaterError(f"Cannot update running tag {tag}", error=e) from e
conditions_db.runningupdate.RunningTagUpdater
Definition: runningupdate.py:47
conditions_db.runningupdate.RunningTagUpdater._check_running_tag
def _check_running_tag(self, tagname, payloads)
Definition: runningupdate.py:144
conditions_db.runningupdate.RunningTagUpdater._allow_closed
_allow_closed
Do we allow closed iovs in staging?
Definition: runningupdate.py:86
conditions_db.runningupdate.RunningTagUpdater._check_all
def _check_all(self)
Definition: runningupdate.py:132
conditions_db.runningupdate.RunningTagUpdater._staging_payloads
_staging_payloads
Payloads currently in the staging tag.
Definition: runningupdate.py:108
conditions_db.runningupdate.RunningTagUpdater._db
_db
Reference to the database object to use for queries.
Definition: runningupdate.py:69
conditions_db.runningupdate.RunningTagUpdater._check_state
def _check_state(self, tagname, taginfo, required)
Definition: runningupdate.py:112
conditions_db.runningupdate.RunningTagUpdater._running_payloads
_running_payloads
Payloads currently in the running tag.
Definition: runningupdate.py:106
conditions_db.runningupdate.RunningTagUpdater.calculate_update
def calculate_update(self)
Definition: runningupdate.py:297
conditions_db.runningupdate.RunningTagUpdater._mode
_mode
True if we want to allow payloads in the staging tag to be closed, for example when retireing a paylo...
Definition: runningupdate.py:84
conditions_db.runningupdate.RunningTagUpdater._operations
_operations
Operations for the update, filled by calculate_update()
Definition: runningupdate.py:94
conditions_db.iov.IntervalOfValidity
Definition: iov.py:16
conditions_db.runningupdate.RunningTagUpdaterError
Definition: runningupdate.py:35
conditions_db.runningupdate.RunningTagUpdateMode
Definition: runningupdate.py:15
conditions_db.ConditionsDB.RequestError
Definition: __init__.py:150
conditions_db.runningupdate.RunningTagUpdater._check_staging_tag
def _check_staging_tag(self, tagname, payloads)
Definition: runningupdate.py:200
conditions_db.runningupdate.RunningTagUpdaterError.extra_vars
extra_vars
extra keyword arguments given to the exception constructor
Definition: runningupdate.py:44
conditions_db.runningupdate.RunningTagUpdater.apply_update
def apply_update(self)
Definition: runningupdate.py:346
conditions_db.runningupdate.RunningTagUpdater._staging_coverage
_staging_coverage
Dictionary mapping payload names in the staging tag to the coverage they have in the staging tag,...
Definition: runningupdate.py:92
conditions_db.runningupdate.RunningTagUpdater._fix_closed
_fix_closed
Do we want to automatically open closed iovs?
Definition: runningupdate.py:89
conditions_db.runningupdate.RunningTagUpdater._staging_first_iovs
_staging_first_iovs
First iov per payload name in staging to not close and open the same revision.
Definition: runningupdate.py:110
conditions_db.runningupdate.RunningTagUpdater._check_staging_tag_simple
def _check_staging_tag_simple(self, tagname, payloads)
Definition: runningupdate.py:171
conditions_db.runningupdate.RunningTagUpdater._running_info
_running_info
Globaltag information for the running tag.
Definition: runningupdate.py:97
conditions_db.iov.IoVSet
Definition: iov.py:269
conditions_db.runningupdate.RunningTagUpdater._staging_info
_staging_info
Globaltag information for the staging tag.
Definition: runningupdate.py:99
conditions_db.runningupdate.RunningTagUpdater._valid_from
_valid_from
First valid run for the update.
Definition: runningupdate.py:80