Belle II Software development
runningupdate.py
1#!/usr/bin/env python3
2
3
10
11"""
12This module contains some classes to check the possibility and calculate the
13necessary updates for a running globaltag
14"""
15
16from enum import Enum
17from collections import defaultdict
18from basf2 import B2ERROR, B2WARNING, B2INFO # noqa
19from conditions_db import ConditionsDB
20from conditions_db.iov import IntervalOfValidity, IoVSet
21
22
24 """Define the different modes for the update of a running tag"""
25
28 STRICT = 1
29
30 ALLOW_CLOSED = 2
31
32 FIX_CLOSED = 3
33
35 SIMPLE = 4
36
40 FULL_REPLACEMENT = 6
41
42
43class RunningTagUpdaterError(Exception):
44 """
45 Errors raised when trying to update the running globaltag. Can have extra
46 variables in `extra_vars` to be shown to the user for additional information
47 """
48
49
50 def __init__(self, description, **extra_vars):
51 super().__init__(description)
52
53 self.extra_vars = extra_vars
54
55
57 """
58 Calculate and apply the necessary changes to update a running globaltag
59
60 For this we take two globaltags: the running one and a staging one
61 containing all the payloads and iovs to be added to the running tag. We then
62
63 1. Make sure they are in the correct states (RUNNING and VALIDATED)
64 2. Make sure all payloads in the running tag start and end (except for open
65 iovs) before the given ``valid_from`` run
66 3. Make sure the staging tag is overlap and gap free
67 4. Make all payloads in staging start at either 0,0 or on/after the given
68 ``valid_from`` run
69 5. Make sure all payloads in staging are unbound unless the mode is
70 ``ALLOW_CLOSED`` or ``FIX_CLOSED``. In case of ``FIX_CLOSED`` extend the
71 last iov to infinity
72 6. Close all payloads to be updated in the running tag that are open just
73 before the validity in the staging tag.
74 """
75
76 def __init__(self, db, running, staging, valid_from, mode, dry_run=False):
77 """Initialize the class
78
79 Arguments:
80 db (ConditionsDB): reference to the database object
81 running (str): name of the running tag
82 staging (str): name of the staging tag
83 valid_from (tuple(int,int)): first valid exp,run
84 mode (RunningTagUpdateMode): the mode of the update
85 dry_run (bool): If true only check, don't do anything.
86 But be more lenient with globaltag state of staging.
87 """
88
89 self._db = db
90
91 # make sure the valid_from is a tuple of two ints
92 try:
93 valid_from = tuple(map(int, valid_from))
94 if len(valid_from) != 2:
95 raise ValueError("exp,run number needs to have two elements")
96 except Exception as e:
97 raise RunningTagUpdaterError("No first valid run for the update specified", error=str(e)) from e
98
99
101 self._dry_run = dry_run
102
103 self._valid_from = valid_from
104
105
107 self._mode = mode
108
109 self._allow_closed = mode in (RunningTagUpdateMode.ALLOW_CLOSED, RunningTagUpdateMode.FIX_CLOSED,
110 RunningTagUpdateMode.FULL_REPLACEMENT)
111
112 self._fix_closed = (mode == RunningTagUpdateMode.FIX_CLOSED)
113
116
117 self._operations = None
118
119
120 self._running_info = db.get_globalTagInfo(running)
121
122 self._staging_info = db.get_globalTagInfo(staging)
123 # make sure the tags can be found and have the correct state
124 self._check_state(running, self._running_info, "RUNNING")
125 self._check_state(staging, self._staging_info, "VALIDATED")
126
127 # Get the actual payloads
128
129 self._running_payloads = db.get_all_iovs(self._running_info['name'])
130
131 self._staging_payloads = db.get_all_iovs(self._staging_info['name'])
132
134
135 def _check_state(self, tagname, taginfo, required):
136 """Check the state of a globaltag given the tag information object returned by the database
137
138 1) that it's found and
139 2) that it has the same state as in ``required``
140
141 Parameters:
142 tagname: name of the tag for error messages
143 taginfo: tag information returned from the database, None if the tag could not be found
144 required: required state for the tag.
145
146 Raises:
147 an `RunningTagUpdaterError` if any condition is not fulfilled
148 """
149 if taginfo is None:
150 raise RunningTagUpdaterError(f"Globaltag '{tagname}' cannot be found")
151 state = taginfo['globalTagStatus']['name'].upper()
152 if state != required.upper():
153 if self._dry_run:
154 B2WARNING(f"Globaltag '{tagname}' not in {required.upper()} state, continuing to display changes")
155 return
156 raise RunningTagUpdaterError(f"Globaltag '{tagname}' not in {required.upper()} state", state=state)
157
158 def _check_all(self):
159 """Run all necessary checks on all globaltags"""
160 # And check both tags
161 self._check_running_tag(self._running_info['name'], self._running_payloads)
162 # and check the staging tag
163 if self._mode == RunningTagUpdateMode.SIMPLE:
164 # the simple mode is so much simpler that a different method is best
166 else:
167 # all other modes are covered here
168 self._check_staging_tag(self._staging_info['name'], self._staging_payloads)
169
170 def _check_running_tag(self, tagname, payloads):
171 """
172 Check that all payloads in the running tag start and end (or are open)
173 before the first valid run for the update
174 """
175 errors = {
176 "payloads start after first valid run": 0,
177 "payloads end after first valid run": 0
178 }
179 earliest_valid_from = (0, 0)
180 for p in payloads:
181 iov = IntervalOfValidity(p.iov)
182 # starting of a validity is simple ... it needs to be below the first valid run
183 if iov.first >= self._valid_from:
184 B2ERROR(f"Payload in running tag '{tagname}' starts after first valid run",
185 payload=p.name, iov=p.iov, **{"first valid run": self._valid_from})
186 errors["payloads start after first valid run"] += 1
187 # end of a validity only matters for closed iovs, open iovs can get clipped
188 elif iov.final != IntervalOfValidity.always().final and iov.final >= self._valid_from:
189 B2ERROR(f"Payload in running tag '{tagname}' ends after first valid run",
190 payload=p.name, iov=p.iov, **{"first valid run": self._valid_from})
191 errors["payloads end after first valid run"] += 1
192
193 earliest_valid_from = max(earliest_valid_from, iov.first)
194 if iov.final != IntervalOfValidity.always().final:
195 earliest_valid_from = max(earliest_valid_from, iov.final)
196
197 if self._dry_run:
198 B2INFO("Earliest possible update of the running tag would be exp "
199 f"{earliest_valid_from[0]}, run {earliest_valid_from[1] + 1}")
200
201 # show errors if we have any ...
202 if any(errors.values()):
203 raise RunningTagUpdaterError("Given first valid run conflicts with "
204 f"running tag '{tagname}'", **errors)
205
206 def _check_staging_tag_simple(self, tagname, payloads):
207 """
208 Extra simple case where we want to have a very simple staging tag just
209 consisting of (0,0,-1,-1) iovs, one per payload
210 """
211 # This is the easy case: Make sure **ALL** iovs are the same and we only
212 # have one per payload name.
213 payload_names = set()
214 errors = {"duplicate payloads": 0, "wrong validity": 0}
215 for p in payloads:
216 if p.name in payload_names:
217 B2ERROR(f"Duplicate payload in staging tag '{tagname}'", name=p.name)
218 errors["duplicate payloads"] += 1
219 payload_names.add(p.name)
220 if p.iov != (0, 0, -1, -1):
221 errors["wrong validity"] += 1
222 B2ERROR(f"Wrong validity for payload in staging tag '{tagname}'", name=p.name, validity=p.iov)
223
224 # trivial, only one payload per name to remember so it's always the first
225 self._staging_first_iovs[p.name] = p
226
227 # how many errors did we have?
228 if any(errors.values()):
229 raise RunningTagUpdaterError(f"Staging tag '{tagname}' not fit for update in simple mode", **errors)
230
231 # everything is fine, set the coverage to everything for all payloads ...
232 always = IoVSet([IntervalOfValidity.always()])
233 self._staging_coverage = {name: always for name in payload_names}
234
235 def _check_staging_tag(self, tagname, payloads):
236 """
237 Check if the staging tag is
238 1. overlap free
239 2. gap free
240 3. all payloads are open (unless the mode allows closed payloads)
241 4. all payloads start at 0,0 or after the first valid run for the update
242
243 Arguments:
244 tagname (str): Name of the globaltag for error messages
245 payloads (list(conditions_db.PayloadInformation)): List of payloads in the tag
246 """
247 # coverage for all iovs for a payload name not starting at (0,0) as those
248 # will be adjusted to first valid run
249 explicit_coverage = defaultdict(IoVSet)
250 # full coverage of the payload to check for overlaps and gaps and later
251 # use it to correctly close open iovs in the running tag
252 full_coverage = defaultdict(IoVSet)
253 # if we want to open iovs we need to know which was the latest
254 latest_iov = {}
255 # error dictionary to return to caller
256 errors = {'overlaps': 0, 'gaps': 0, 'starts too early': 0, 'closed payloads': 0}
257 # go through all payloads and form the union of all their validities ...
258 # and check for overlaps
259 for p in payloads:
260 iov = IntervalOfValidity(p.iov)
261 try:
262 # add the iov to the
263 full_coverage[p.name].add(iov, allow_overlaps=False)
264 except ValueError as e:
265 B2ERROR(f"Overlap in globaltag '{tagname}'", payload=p.name, overlap=e)
266 errors['overlaps'] += 1
267 # now add it anyways to check for more overlaps with the other
268 # iovs in the tag just to give a complete list of errors
269 full_coverage[p.name].add(iov, allow_overlaps=True)
270
271 # add all iovs not starting at 0,0 to the explicit coverage of this payload
272 if iov.first != (0, 0):
273 explicit_coverage[p.name].add(iov, allow_overlaps=True)
274
275 # do we need to open iovs? If so remember the latest iov for each payload
276 if self._fix_closed:
277 prev = latest_iov.get(p.name, None)
278 if prev is None:
279 latest_iov[p.name] = p
280 else:
281 latest_iov[p.name] = max(p, prev)
282
283 # remember the first iov of each payload to extend what's in running if they match
284 first = self._staging_first_iovs.get(p.name, None)
285 if first is None:
286 self._staging_first_iovs[p.name] = p
287 else:
288 self._staging_first_iovs[p.name] = min(p, first)
289
290 # Ok, now check for all payloads if the resulting iov is a single one or multiple,
291 # aka having gaps. In that case print the gaps
292 for name, iovs in full_coverage.items():
293 if len(iovs) > 1:
294 B2ERROR(f"Gap in globaltag '{tagname}'", payload=name, gaps=iovs.gaps)
295 errors['gaps'] += len(iovs) - 1
296 # Also, make sure the iovs for the payloads are infinite and either
297 # raise errors or at least show a warning
298 if iovs.final != IntervalOfValidity.always().final:
299 log_func = B2WARNING
300 if not self._allow_closed:
301 log_func = B2ERROR
302 errors['closed payloads'] += 1
303 log_func(f"Payload in globaltag '{tagname}' not open ended",
304 payload=name, **{"final run": iovs.final})
305
306 # No gaps, no overlaps, but do we start at the given valid_from run?
307 for name, iovs in explicit_coverage.items():
308 if iovs.first < self._valid_from:
309 B2ERROR(f"Payload in globaltag '{tagname}' starts before the given first valid run",
310 payload=name, **{"actual start validity": iovs.first,
311 "expected start validity": self._valid_from})
312 errors['starts too early'] += 1
313
314 # Do we want to open iovs?
315 if self._fix_closed:
316 # Then do so ...
317 for payload in latest_iov.values():
318 if payload.iov[2:] != (-1, -1):
319 B2INFO("Extending closed iov to infinity", name=payload.name,
320 **{"old iov": payload.iov})
321 payload.iov = payload.iov[:2] + (-1, -1)
322 full_coverage[payload.name].add(payload.iov, allow_overlaps=True)
323
324 # Any errors?
325 if any(errors.values()):
326 raise RunningTagUpdaterError(f"Staging tag '{tagname}' not fit for update", **errors)
327
328 # No errors, great, remember the coverages for all payloads to select the
329 # proper payloads to be closed in the running tag and where to close them
330 self._staging_coverage = dict(full_coverage)
331
333 """
334 Calculate the operations needed to merge staging into the running base tag
335 """
336 # Check all tags and payloads
337 self._check_all()
338 # Ok, all checks done ...
339 valid_range = IntervalOfValidity(*(self._valid_from + (-1, -1)))
340 operations = []
341 # we only need to close payloads that are present in staging ... unless
342 # we run in FULL mode in which we close everything
343 for p in self._running_payloads:
344 # so check if we have a coverage in the staging tag
345 staging = self._staging_coverage.get(p.name, None)
346 if p.iov[2:] == (-1, -1):
347 if staging is None:
348 # payload not present in staging tag. Ignore unless we do full replacement
349 if not self._mode == RunningTagUpdateMode.FULL_REPLACEMENT:
350 continue
351 staging_range = valid_range
352 else:
353 # extend the staging to infinity to make sure we close existing payloads correctly
354 staging_range = IntervalOfValidity(*(staging.first + (-1, -1))) & valid_range
355
356 # if the first payload didn't change revision we don't need to make
357 # a new iov but can just extend the existing one
358 first_iov = self._staging_first_iovs.get(p.name, None)
359 if first_iov is not None and first_iov.revision == p.revision:
360 staging_range -= IntervalOfValidity(first_iov.iov)
361 self._staging_payloads.remove(first_iov)
362 # there's a chance this is empty now
363 if not staging_range:
364 continue
365 # close the existing iov before the range covered in staging
366 p.iov = (IntervalOfValidity(p.iov) - staging_range).tuple
367 # and mark the iov for closing
368 operations.append(["CLOSE", p])
369 # and all payloads that need adjusting in staging
370 for p in self._staging_payloads:
371 # clip them to the valid range ... which only affects iovs starting
372 # from 0,0. Everything else would have raised an error
373 p.iov = (IntervalOfValidity(p.iov) & valid_range).tuple
374 # and add them to the list
375 operations.append(["CREATE", p])
376
377 # remember the operations in case we want to apply them
378 self._operations = operations
379 return self._operations
380
381 def apply_update(self):
382 """Apply a previously calculated update to the globaltag
383
384 Warning:
385 This action cannot be undone, only call it after checking the
386 operations returned by the calculation of the update
387 """
388 if self._dry_run:
389 raise RunningTagUpdaterError("Called in dry-run mode, refusing to cooperate")
390
391 if self._operations is None:
392 raise RunningTagUpdaterError("Update needs to be calculated first")
393
394 operations = []
395 for op, payload in self._operations:
396 if op == "CLOSE":
397 operations.append({"operation": "MODIFY", "data": [payload.iov_id] + list(payload.iov[2:])})
398 elif op == "CREATE":
399 operations.append({"operation": "CREATE", "data": [payload.payload_id] + list(payload.iov)})
400 else:
401 raise RunningTagUpdaterError(f"Unknown operation type: {op}")
402
403 if not operations:
404 return
405
406 tag = self._running_info['name']
407 try:
408 self._db.request("POST", f"/globalTagPayload/{tag}/updateRunningPayloads",
409 f"updating running tag {tag}", json=operations)
410 except ConditionsDB.RequestError as e:
411 raise RunningTagUpdaterError(f"Cannot update running tag {tag}", error=e) from e
extra_vars
extra keyword arguments given to the exception constructor
def __init__(self, description, **extra_vars)
Initialize the class.
_running_payloads
Payloads currently in the running tag.
def _check_staging_tag(self, tagname, payloads)
_valid_from
First valid run for the update.
_mode
True if we want to allow payloads in the staging tag to be closed, for example when retiring a payloa...
_staging_payloads
Payloads currently in the staging tag.
_staging_info
Globaltag information for the staging tag.
def _check_running_tag(self, tagname, payloads)
_staging_coverage
Dictionary mapping payload names in the staging tag to the coverage they have in the staging tag,...
_staging_first_iovs
First iov per payload name in staging to not close and open the same revision.
_allow_closed
Do we allow closed iovs in staging?
_running_info
Globaltag information for the running tag.
_fix_closed
Do we want to automatically open closed iovs?
def __init__(self, db, running, staging, valid_from, mode, dry_run=False)
def _check_state(self, tagname, taginfo, required)
_operations
Operations for the update, filled by calculate_update()
_dry_run
If we're in dry run mode be less critical about globaltag states (just show warnings) but refuse to d...
def _check_staging_tag_simple(self, tagname, payloads)
_db
Reference to the database object to use for queries.