10 from concurrent.futures
import ThreadPoolExecutor
11 from collections
import defaultdict
12 from basf2
import B2INFO, B2ERROR, B2WARNING, LogPythonInterface
14 from terminal_utils
import Pager
15 from conditions_db
import set_cdb_authentication_token
20 def get_all_iovsets(existing_payloads, run_range=None):
21 """Given a list of PayloadInformation objects, return a reduced list PayloadInformation
22 objects with the single iovs replaced with IoVSets. Payloads with the same
23 name and revision will be merged.
25 Overlaps will raise an B2ERROR
27 all_payloads = defaultdict(
lambda: IoVSet(allow_overlaps=
True))
28 by_name = defaultdict(
lambda: IoVSet(allow_overlaps=
False))
30 for payload
in existing_payloads:
32 iov = IntervalOfValidity(payload.iov)
34 if run_range
is not None:
40 all_payloads[payload.name, payload.revision].add(iov)
42 infos[payload.name, payload.revision] = payload
46 by_name[payload.name].add(iov)
47 except ValueError
as e:
48 B2ERROR(f
"Overlap for payload {payload.name} r{payload.revision}: {e}")
52 for (name, revision), iov
in all_payloads.items():
53 info = infos[name, revision]
61 def create_iov_wrapper(db, globaltag_id, payload):
63 Wrapper function for adding payloads into a given globaltag.
65 for iov
in payload.iov:
66 if db.create_iov(globaltag_id, payload.payload_id, *iov.tuple)
is None:
67 raise RuntimeError(f
"Cannot create iov for {payload.name} r{payload.revision}")
70 def command_tag_merge(args, db=None):
72 Merge a list of globaltags in the order they are given.
74 This command allows to merge a number of globaltags into a single globaltag.
75 Payloads from later globaltags in the list of arguments are used to fill gaps
76 present in earlier globaltags.
78 The result is equivalent to having multiple globaltags setup in the conditions
79 access for basf2 (highest priority goes first).
82 The order of the globaltags is highest priority first, so payloads from
83 globaltags earlier on the command line will be taken with before globaltags
86 This command requires that all globaltags are overlap free.
88 For each globaltag in the list we copy all payloads to the output globaltag
89 if there is no payload of that name valid for the given interval of validity
90 in any previous globaltags in the list.
92 If the payload overlaps partially with a payload from a previous globaltag
93 in the list the interval of validity is shortened (and possibly split) to
94 not overlap but to just fill the gaps.
98 globaltag A contains ::
100 payload1, rev 2, valid from 1,0 to 1,10
101 payload1, rev 3, valid from 1,20 to 1,22
102 payload2, rev 1, valid from 1,0 to 1,-1
104 globaltag B contains ::
106 payload1, rev 1, valid from 1,1 to 1,30
107 payload2, rev 2, valid from 0,1 to 1,20
109 Then running ``b2conditionsdb tag merge -o output A B``, the output globaltag
110 after the merge will contain::
112 payload1, rev 2, valid from 1,0 to 1,10
113 payload1, rev 1, valid from 1,11 to 1,19
114 payload1, rev 3, valid from 1,20 to 1,22
115 payload1, rev 1, valid from 1,23 to 1,30
116 payload2, rev 2, valid from 0,1 to 0,-1
117 payload2, rev 1, valid from 1,0 to 1,-1
119 When finished, this command will print a table of payloads and their
120 validity and from which globaltag they were taken. If ``--dry-run`` is given
121 it will only print the list of payloads.
123 Optionally one can specify ``--run-range`` to limit the run range for which
124 the merging should take place. In the example above, running with
125 ``--run-range 1 0 1 21`` the result would be ::
127 payload1, rev 2, valid from 1,0 to 1,10
128 payload1, rev 1, valid from 1,11 to 1,19
129 payload1, rev 3, valid from 1,20 to 1,21
130 payload2, rev 1, valid from 1,0 to 1,21
132 .. versionadded:: release-05-01-00
136 args.add_argument(
"globaltag", nargs=
"+", help=
"name of the globaltag")
137 group = args.add_argument_group(
"required named arguments")
138 group.add_argument(
"-o",
"--output", required=
True, help=
"Name of the output globaltag")
139 args.add_argument(
"--dry-run", help=
"Don't do anything, just print a table with the results",
140 action=
"store_true", default=
False)
141 args.add_argument(
"--run-range", nargs=4, default=
None, type=int,
142 metavar=(
"FIRST_EXP",
"FIRST_RUN",
"FINAL_EXP",
"FINAL_RUN"),
143 help=
"Can be for numbers to limit the run range to put"
144 "in the output globaltag: All iovs will be limited to "
146 args.add_argument(
"-j", type=int, default=10, dest=
"nprocess",
147 help=
"Number of concurrent threads to use for "
148 "creating payloads into the output globaltag.")
152 set_cdb_authentication_token(db, args.auth_token)
155 support_color = LogPythonInterface.terminal_supports_colors()
157 colors =
"\x1b[32m \x1b[34m \x1b[35m \x1b[31m".split()
158 colors = {tag: color
for tag, color
in zip(args.globaltag, colors)}
160 def color_row(row, _, line):
161 """Color the lines depending on which globaltag the payload comes from"""
162 if not support_color:
164 begin = colors.get(row[-1],
"")
166 return begin + line + end
168 with Pager(
"Result of merging globaltags",
True):
170 output_id = db.get_globalTagInfo(args.output)
171 if output_id
is None:
172 B2ERROR(
"Output globaltag doesn't exist. Please create it first with a proper description")
175 output_id = output_id[
"globalTagId"]
178 if any(db.get_globalTagInfo(tag)
is None for tag
in args.globaltag):
183 existing = defaultdict(
lambda: IoVSet(allow_overlaps=
True))
184 if args.run_range
is not None:
185 args.run_range = IntervalOfValidity(args.run_range)
188 for tag
in args.globaltag:
190 all_payloads = db.get_all_iovs(tag)
192 all_payloads.sort(key=
lambda p: p.revision, reverse=
True)
195 all_payloads.sort(key=
lambda p: p.name, reverse=
False)
197 payloads = get_all_iovsets(all_payloads, args.run_range)
198 for payload
in payloads:
200 payload.iov.remove(existing[payload.name])
204 existing[payload.name] |= payload.iov
206 final.append(payload)
208 for iov
in payload.iov:
209 table.append([payload.name, payload.revision] + list(iov.tuple) + [tag])
212 table.sort(key=
lambda r: (r[0], r[3:5]))
215 table.insert(0, [
"Name",
"Rev",
"First Exp",
"First Run",
"Final Exp",
"Final Run",
"Source"])
216 columns = [
"+", -8, 6, 6, 6, 6, max(len(_)
for _
in args.globaltag)]
218 B2INFO(f
"Result of merging the globaltags {', '.join(args.globaltag)}")
220 pretty_print_table(table, columns, transform=color_row)
224 B2INFO(f
'Now copying the {len(final)} payloads into {args.output} to create {len(table)-1} iovs ...')
225 create_iov = functools.partial(create_iov_wrapper, db, output_id)
227 with ThreadPoolExecutor(max_workers=args.nprocess)
as pool:
228 start = time.monotonic()
229 for payload, _
in enumerate(pool.map(create_iov, final), 1):
230 eta = (time.monotonic() - start) / payload * (len(final) - payload)
231 B2INFO(f
"{payload}/{len(final)} payloads copied, ETA: {eta:.1f} seconds")
233 B2ERROR(
"Not all iovs could be created. This could be a server/network problem "
234 "or the destination globaltag was not empty or not writeable. Please make "
235 "sure the target tag is empty and try again")
241 def command_tag_runningupdate(args, db=None):
243 Update a running globaltag with payloads from a staging tag
245 This command will calculate and apply the necessary updates to a running
246 globaltag with a given staging globaltag
248 Running tags are defined as "immutable for existing data but conditions for
249 newer runs may be added" and the only modification allowed is to add new
250 payloads for new runs or close existing payloads to no longer be valid for
253 This command takes previously prepared and validated payloads in a staging
254 globaltag and will then calculate which payloads to close and what to add to
255 the running globaltag.
257 For this to work we require
259 1. A running globaltag in the state "RUNNING"
261 2. A (experiment, run) number from which run on the update should be valid.
262 This run number needs to be
264 a) bigger than the start of validity for all iovs in the running tag
265 b) bigger than the end of validity for all closed iovs (i.e. not valid
266 to infinity) in the running tag
268 3. A staging globaltag with the new payloads in state "VALIDATED"
270 a) payloads in the staging tag starting at (0,0) will be interpreted as
271 starting at the first valid run for the update
272 b) all other payloads need to start at or after the first valid run for
274 c) The globaltag needs to be gap and overlap free
275 d) All payloads in the staging tag should have as last iov an open iov
276 (i.e. valid to infinity) but this can be disabled.
278 The script will check all the above requirements and will then calculate the
279 necessary operations to
281 1. Add all payloads from the staging tag where a start validity of (0, 0) is
282 replaced by the starting run for which this update should be valid.
283 2. close all iovs for payloads in the running tags just before the
284 corresponding iov of the same payload in the staging tag, so either at the
285 first run for the update to be valid or later
286 3. Optionally, make sure all payloads in the staging tag end in an open iov.
290 running tag contains ::
292 payload1, rev 1, valid from 0,1 to 1,0
293 payload1, rev 2, valid from 1,1 to -1,-1
294 payload2, rev 1, valid from 0,1 to -1,-1
295 payload3, rev 1, valid from 0,1 to 1,0
296 payload4, rev 1, valid from 0,1 to -1,-1
297 payload5, rev 1, valid from 0,1 to -1,-1
299 staging tag contains ::
301 payload1, rev 3, valid from 0,0 to 1,8
302 payload1, rev 4, valid from 1,9 to 1,20
303 payload2, rev 2, valid from 1,5 to 1,20
304 payload3, rev 2, valid from 0,0 to -1,-1
305 payload4, rev 1, valid from 0,0 to 1,20
307 Then running ``b2conditionsdb tag runningupdate running staging --run 1 2 --allow-closed``,
308 the running globaltag after the update will contain ::
310 payload1, rev 1, valid from 0,1 to 1,0
311 payload1, rev 2, valid from 1,1 to 1,1
312 payload1, rev 3, valid from 1,2 to 1,8
313 payload1, rev 4, valid from 1,9 to 1,20
314 payload2, rev 1, valid from 0,1 to 1,4
315 payload2, rev 2, valid from 1,5 to 1,20
316 payload3, rev 1, valid from 0,1 to 1,0
317 payload3, rev 2, valid from 1,2 to -1,-1
318 payload4, rev 1, valid from 0,1 to 1,20
319 payload5, rev 1, valid from 0,1 to -1,-1
323 - the start of payload1 and payload3 in staging has been adjusted
324 - payload2 in the running tag as been closed at 1,4, just before the
325 validity from the staging tag
326 - payload3 was already closed in the running tag so no change is
327 performed. This might result in gaps but is intentional
328 - payload4 was not closed at rim 1,2 because the staging tag had the same
329 revision of the payload so the these were merged to one long validity.
330 - payload5 was not closed as there was no update to it in the staging tag.
331 If we would have run with ``--full-replacement`` it would have been closed.
332 - if we would have chosen ``--run 1 1`` the update would have failed because
333 payload1, rev2 in running starts at 1,1 so we would have a conflict
334 - if we would have chosen ``--run 1 6`` the update would have failed because
335 payload2 in the staging tag starts before this run
336 - if we would have chosen to open the final iovs in staging by using
337 ``--fix-closed``, payload1, rev 4; payload2, rev 2 and payload4 rev 1
338 would be valid until -1,-1 after the running tag. In fact, payload 4
339 would not be changed at all.
342 args.add_argument(
"running", help=
"name of the running globaltag")
343 args.add_argument(
"staging", help=
"name of the staging globaltag")
344 group = args.add_argument_group(
"required named arguments")
345 group.add_argument(
"-r",
"--run", required=
True, nargs=2, type=int, metavar=(
"EXP",
"RUN"),
346 help=
"First experiment + run number for which the update should be "
347 "valid. Two numbers separated by space")
348 choice = args.add_mutually_exclusive_group()
349 choice.add_argument(
"--allow-closed", dest=
"mode", action=
"store_const",
350 const=RunningTagUpdateMode.ALLOW_CLOSED,
351 default=RunningTagUpdateMode.STRICT,
352 help=
"if given allow payloads in the staging tag to not "
353 "be open, i.e. they don't have to be open ended in the "
354 "update. Useful to retire a payload by adding one last update")
355 choice.add_argument(
"--fix-closed", dest=
"mode", action=
"store_const",
356 const=RunningTagUpdateMode.FIX_CLOSED,
357 help=
"if given automatically open the last iov for each "
358 "payload in staging if it is closed.")
359 choice.add_argument(
"--simple-mode", dest=
"mode", action=
"store_const",
360 const=RunningTagUpdateMode.SIMPLE,
361 help=
"if given require the staging tag to solely consist "
362 "of fully infinite validities: Only one iov per payload "
363 "with a validity of (0,0,-1,-1)")
364 choice.add_argument(
"--full-replacement", dest=
"mode", action=
"store_const",
365 const=RunningTagUpdateMode.FULL_REPLACEMENT,
366 help=
"if given perform a full replacement and close all "
367 "open iovs in the running tag not present in the staging tag. "
368 "After such an update exactly the payloads in the staging tag "
369 "will be valid after the given run. This allows for closed iovs "
370 "in the staging tag as with ``--allow-closed``")
371 args.add_argument(
"--dry-run", default=
False, action=
"store_true",
372 help=
"Only show the changes, don't try to apply them")
376 set_cdb_authentication_token(db, args.auth_token)
379 updater = RunningTagUpdater(db, args.running, args.staging, args.run, args.mode, args.dry_run)
380 operations = updater.calculate_update()
381 except RunningTagUpdaterError
as e:
382 B2ERROR(e, **e.extra_vars)
387 B2INFO(
"Nothing to do, please check the globaltags given are correct")
392 last_valid = tuple(args.run)
394 "first valid run": last_valid,
395 "payloads closed": 0,
396 "payloads updated": 0,
397 "payload iovs added": 0,
398 "next possible update": last_valid,
401 updated_payloads = set()
402 for op, payload
in operations:
405 summary[
'payloads closed'] += 1
407 updated_payloads.add(payload.name)
408 summary[
'payload iovs added'] += 1
410 last_valid = max(payload.iov[:2], last_valid)
412 if payload.iov[2:] != (-1, -1):
413 last_valid = max(payload.iov[2:], last_valid)
415 table.append([op, payload.name, payload.revision] + list(payload.iov))
418 summary[
'next possible update'] = (last_valid[0] + (1
if last_valid[1] < 0
else 0), last_valid[1] + 1)
420 summary[
'payloads updated'] = len(updated_payloads)
423 support_color = LogPythonInterface.terminal_supports_colors()
425 def color_row(row, _, line):
426 """Color the lines depending on which globaltag the payload comes from"""
427 if not support_color:
429 begin =
"" if row[0] !=
"CLOSE" else "\x1b[31m"
431 return begin + line + end
434 table.sort(key=
lambda x: (x[1], x[3:], x[2], x[0]))
435 table.insert(0, [
"Action",
"Payload",
"Rev",
"First Exp",
"First Run",
"Final Exp",
"Final Run"])
436 columns = [6,
'*', -6, 6, 6, 6, 6]
438 with Pager(f
"Changes to running tag {args.running}:",
True):
439 B2INFO(f
"Changes to be applied to the running tag {args.running}")
440 pretty_print_table(table, columns, transform=color_row)
443 B2INFO(
"Running in dry mode, not applying any changes.", **summary)
446 B2WARNING(
"Applying these changes cannot be undone and further updates to "
447 "this run range will **NOT** be possible", **summary)
449 answer = input(
"Are you sure you want to continue? [yes/No]: ")
450 while answer.lower().strip()
not in [
'yes',
'no',
'n',
'']:
451 answer = input(
"Please enter 'yes' or 'no': ")
453 if answer.lower().strip() !=
'yes':
454 B2INFO(
"Aborted by user ...")
459 updater.apply_update()
462 except RunningTagUpdaterError
as e:
463 B2ERROR(e, **e.extra_vars)