10 from concurrent.futures
import ThreadPoolExecutor
11 from collections
import defaultdict
12 from basf2
import B2INFO, B2ERROR, B2WARNING, LogPythonInterface
14 from terminal_utils
import Pager
19 def get_all_iovsets(existing_payloads, run_range=None):
20 """Given a list of PayloadInformation objects, return a reduced list PayloadInformation
21 objects with the single iovs replaced with IoVSets. Payloads with the same
22 name and revision will be merged.
24 Overlaps will raise an B2ERROR
26 all_payloads = defaultdict(
lambda: IoVSet(allow_overlaps=
True))
27 by_name = defaultdict(
lambda: IoVSet(allow_overlaps=
False))
29 for payload
in existing_payloads:
31 iov = IntervalOfValidity(payload.iov)
33 if run_range
is not None:
39 all_payloads[payload.name, payload.revision].add(iov)
41 infos[payload.name, payload.revision] = payload
45 by_name[payload.name].add(iov)
46 except ValueError
as e:
47 B2ERROR(f
"Overlap for payload {payload.name}: {e}")
51 for (name, revision), iov
in all_payloads.items():
52 info = infos[name, revision]
60 def create_iov_wrapper(db, globaltag_id, payload):
62 Wrapper function for adding payloads into a given globaltag.
64 for iov
in payload.iov:
65 if db.create_iov(globaltag_id, payload.payload_id, *iov.tuple)
is None:
66 raise RuntimeError(f
"Cannot create iov for {payload.name} r{payload.revision}")
69 def command_tag_merge(args, db=None):
71 Merge a list of globaltags in the order they are given.
73 This command allows to merge a number of globaltags into a single globaltag.
74 Payloads from later globaltags in the list of arguments are used to fill gaps
75 present in earlier globaltags.
77 The result is equivalent to having multiple globaltags setup in the conditions
78 access for basf2 (highest priority goes first).
81 The order of the globaltags is highest priority first, so payloads from
82 globaltags earlier on the command line will be taken with before globaltags
85 This command requires that all globaltags are overlap free.
87 For each globaltag in the list we copy all payloads to the output globaltag
88 if there is no payload of that name valid for the given interval of validity
89 in any previous globaltags in the list.
91 If the payload overlaps partially with a payload from a previous globaltag
92 in the list the interval of validity is shortened (and possibly split) to
93 not overlap but to just fill the gaps.
97 globaltag A contains ::
99 payload1, rev 2, valid from 1,0 to 1,10
100 payload1, rev 3, valid from 1,20 to 1,22
101 payload2, rev 1, valid from 1,0 to 1,-1
103 globaltag B contains ::
105 payload1, rev 1, valid from 1,1 to 1,30
106 payload2, rev 2, valid from 0,1 to 1,20
108 Then running ``b2conditionsdb tag merge -o output A B``, the output globaltag
109 after the merge will contain::
111 payload1, rev 2, valid from 1,0 to 1,10
112 payload1, rev 1, valid from 1,11 to 1,19
113 payload1, rev 3, valid from 1,20 to 1,22
114 payload1, rev 1, valid from 1,23 to 1,30
115 payload2, rev 2, valid from 0,1 to 0,-1
116 payload2, rev 1, valid from 1,0 to 1,-1
118 When finished, this command will print a table of payloads and their
119 validity and from which globaltag they were taken. If ``--dry-run`` is given
120 it will only print the list of payloads.
122 Optionally one can specify ``--run-range`` to limit the run range for which
123 the merging should take place. In the example above, running with
124 ``--run-range 1 0 1 21`` the result would be ::
126 payload1, rev 2, valid from 1,0 to 1,10
127 payload1, rev 1, valid from 1,11 to 1,19
128 payload1, rev 3, valid from 1,20 to 1,21
129 payload2, rev 1, valid from 1,0 to 1,21
131 .. versionadded:: release-05-01-00
135 args.add_argument(
"globaltag", nargs=
"+", help=
"name of the globaltag")
136 group = args.add_argument_group(
"required named arguments")
137 group.add_argument(
"-o",
"--output", required=
True, help=
"Name of the output globaltag")
138 args.add_argument(
"--dry-run", help=
"Don't do anything, just print a table with the results",
139 action=
"store_true", default=
False)
140 args.add_argument(
"--run-range", nargs=4, default=
None, type=int,
141 metavar=(
"FIRST_EXP",
"FIRST_RUN",
"FINAL_EXP",
"FINAL_RUN"),
142 help=
"Can be for numbers to limit the run range to put"
143 "in the output globaltag: All iovs will be limited to "
145 args.add_argument(
"-j", type=int, default=10, dest=
"nprocess",
146 help=
"Number of concurrent threads to use for "
147 "creating payloads into the output globaltag.")
151 support_color = LogPythonInterface.terminal_supports_colors()
153 colors =
"\x1b[32m \x1b[34m \x1b[35m \x1b[31m".split()
154 colors = {tag: color
for tag, color
in zip(args.globaltag, colors)}
156 def color_row(row, _, line):
157 """Color the lines depending on which globaltag the payload comes from"""
158 if not support_color:
160 begin = colors.get(row[-1],
"")
162 return begin + line + end
164 with Pager(
"Result of merging globaltags",
True):
166 output_id = db.get_globalTagInfo(args.output)
167 if output_id
is None:
168 B2ERROR(
"Output globaltag doesn't exist. Please create it first with a proper description")
171 output_id = output_id[
"globalTagId"]
174 if any(db.get_globalTagInfo(tag)
is None for tag
in args.globaltag):
179 existing = defaultdict(
lambda: IoVSet(allow_overlaps=
True))
180 if args.run_range
is not None:
181 args.run_range = IntervalOfValidity(args.run_range)
184 for tag
in args.globaltag:
186 payloads = get_all_iovsets(db.get_all_iovs(tag), args.run_range)
187 for payload
in payloads:
189 payload.iov.remove(existing[payload.name])
193 existing[payload.name] |= payload.iov
195 final.append(payload)
197 for iov
in payload.iov:
198 table.append([payload.name, payload.revision] + list(iov.tuple) + [tag])
201 table.sort(key=
lambda r: (r[0], r[3:5]))
204 table.insert(0, [
"Name",
"Rev",
"First Exp",
"First Run",
"Final Exp",
"Final Run",
"Source"])
205 columns = [
"+", -8, 6, 6, 6, 6, max(len(_)
for _
in args.globaltag)]
207 B2INFO(f
"Result of merging the globaltags {', '.join(args.globaltag)}")
209 pretty_print_table(table, columns, transform=color_row)
213 B2INFO(f
'Now copying the {len(final)} payloads into {args.output} to create {len(table)-1} iovs ...')
214 create_iov = functools.partial(create_iov_wrapper, db, output_id)
216 with ThreadPoolExecutor(max_workers=args.nprocess)
as pool:
217 start = time.monotonic()
218 for payload, _
in enumerate(pool.map(create_iov, final), 1):
219 eta = (time.monotonic() - start) / payload * (len(final) - payload)
220 B2INFO(f
"{payload}/{len(final)} payloads copied, ETA: {eta:.1f} seconds")
222 B2ERROR(
"Not all iovs could be created. This could be a server/network problem "
223 "or the destination globaltag was not empty or not writeable. Please make "
224 "sure the target tag is empty and try again")
230 def command_tag_runningupdate(args, db=None):
232 Update a running globaltag with payloads from a staging tag
234 This command will calculate and apply the necessary updates to a running
235 globaltag with a given staging globaltag
237 Running tags are defined as "immutable for existing data but conditions for
238 newer runs may be added" and the only modification allowed is to add new
239 payloads for new runs or close existing payloads to no longer be valid for
242 This command takes previously prepared and validated payloads in a staging
243 globaltag and will then calculate which payloads to close and what to add to
244 the running globaltag.
246 For this to work we require
248 1. A running globaltag in the state "RUNNING"
250 2. A (experiment, run) number from which run on the update should be valid.
251 This run number needs to be
253 a) bigger than the start of validity for all iovs in the running tag
254 b) bigger than the end of validity for all closed iovs (i.e. not valid
255 to infinity) in the running tag
257 3. A staging globaltag with the new payloads in state "VALIDATED"
259 a) payloads in the staging tag starting at (0,0) will be interpreted as
260 starting at the first valid run for the update
261 b) all other payloads need to start at or after the first valid run for
263 c) The globaltag needs to be gap and overlap free
264 d) All payloads in the staging tag should have as last iov an open iov
265 (i.e. valid to infinity) but this can be disabled.
267 The script will check all the above requirements and will then calculate the
268 necessary operations to
270 1. Add all payloads from the staging tag where a start validity of (0, 0) is
271 replaced by the starting run for which this update should be valid.
272 2. close all iovs for payloads in the running tags just before the
273 corresponding iov of the same payload in the staging tag, so either at the
274 first run for the update to be valid or later
275 3. Optionally, make sure all payloads in the staging tag end in an open iov.
279 running tag contains ::
281 payload1, rev 1, valid from 0,1 to 1,0
282 payload1, rev 2, valid from 1,1 to -1,-1
283 payload2, rev 1, valid from 0,1 to -1,-1
284 payload3, rev 1, valid from 0,1 to 1,0
285 payload4, rev 1, valid from 0,1 to -1,-1
286 payload5, rev 1, valid from 0,1 to -1,-1
288 staging tag contains ::
290 payload1, rev 3, valid from 0,0 to 1,8
291 payload1, rev 4, valid from 1,9 to 1,20
292 payload2, rev 2, valid from 1,5 to 1,20
293 payload3, rev 2, valid from 0,0 to -1,-1
294 payload4, rev 1, valid from 0,0 to 1,20
296 Then running ``b2conditionsdb tag runningupdate running staging --run 1 2 --allow-closed``,
297 the running globaltag after the update will contain ::
299 payload1, rev 1, valid from 0,1 to 1,0
300 payload1, rev 2, valid from 1,1 to 1,1
301 payload1, rev 3, valid from 1,2 to 1,8
302 payload1, rev 4, valid from 1,9 to 1,20
303 payload2, rev 1, valid from 0,1 to 1,4
304 payload2, rev 2, valid from 1,5 to 1,20
305 payload3, rev 1, valid from 0,1 to 1,0
306 payload3, rev 2, valid from 1,2 to -1,-1
307 payload4, rev 1, valid from 0,1 to 1,20
308 payload5, rev 1, valid from 0,1 to -1,-1
312 - the start of payload1 and payload3 in staging has been adjusted
313 - payload2 in the running tag as been closed at 1,4, just before the
314 validity from the staging tag
315 - payload3 was already closed in the running tag so no change is
316 performed. This might result in gaps but is intentional
317 - payload4 was not closed at rim 1,2 because the staging tag had the same
318 revision of the payload so the these were merged to one long validity.
319 - payload5 was not closed as there was no update to it in the staging tag.
320 If we would have run with ``--full-replacement`` it would have been closed.
321 - if we would have chosen ``--run 1 1`` the update would have failed because
322 payload1, rev2 in running starts at 1,1 so we would have a conflict
323 - if we would have chosen ``--run 1 6`` the update would have failed because
324 payload2 in the staging tag starts before this run
325 - if we would have chosen to open the final iovs in staging by using
326 ``--fix-closed``, payload1, rev 4; payload2, rev 2 and payload4 rev 1
327 would be valid until -1,-1 after the running tag. In fact, payload 4
328 would not be changed at all.
331 args.add_argument(
"running", help=
"name of the running globaltag")
332 args.add_argument(
"staging", help=
"name of the staging globaltag")
333 group = args.add_argument_group(
"required named arguments")
334 group.add_argument(
"-r",
"--run", required=
True, nargs=2, type=int, metavar=(
"EXP",
"RUN"),
335 help=
"First experiment + run number for which the update should be "
336 "valid. Two numbers separated by space")
337 choice = args.add_mutually_exclusive_group()
338 choice.add_argument(
"--allow-closed", dest=
"mode", action=
"store_const",
339 const=RunningTagUpdateMode.ALLOW_CLOSED,
340 default=RunningTagUpdateMode.STRICT,
341 help=
"if given allow payloads in the staging tag to not "
342 "be open, i.e. they don't have to be open ended in the "
343 "update. Useful to retire a payload by adding one last update")
344 choice.add_argument(
"--fix-closed", dest=
"mode", action=
"store_const",
345 const=RunningTagUpdateMode.FIX_CLOSED,
346 help=
"if given automatically open the last iov for each "
347 "payload in staging if it is closed.")
348 choice.add_argument(
"--simple-mode", dest=
"mode", action=
"store_const",
349 const=RunningTagUpdateMode.SIMPLE,
350 help=
"if given require the staging tag to solely consist "
351 "of fully infinite validities: Only one iov per payload "
352 "with a validity of (0,0,-1,-1)")
353 choice.add_argument(
"--full-replacement", dest=
"mode", action=
"store_const",
354 const=RunningTagUpdateMode.FULL_REPLACEMENT,
355 help=
"if given perform a full replacement and close all "
356 "open iovs in the running tag not present in the staging tag. "
357 "After such an update exactly the payloads in the staging tag "
358 "will be valid after the given run. This allows for closed iovs "
359 "in the staging tag as with ``--allow-closed``")
360 args.add_argument(
"--dry-run", default=
False, action=
"store_true",
361 help=
"Only show the changes, don't try to apply them")
365 updater = RunningTagUpdater(db, args.running, args.staging, args.run, args.mode, args.dry_run)
366 operations = updater.calculate_update()
367 except RunningTagUpdaterError
as e:
368 B2ERROR(e, **e.extra_vars)
373 B2INFO(
"Nothing to do, please check the globaltags given are correct")
378 last_valid = tuple(args.run)
380 "first valid run": last_valid,
381 "payloads closed": 0,
382 "payloads updated": 0,
383 "payload iovs added": 0,
384 "next possible update": last_valid,
387 updated_payloads = set()
388 for op, payload
in operations:
391 summary[
'payloads closed'] += 1
393 updated_payloads.add(payload.name)
394 summary[
'payload iovs added'] += 1
396 last_valid = max(payload.iov[:2], last_valid)
398 if payload.iov[2:] != (-1, -1):
399 last_valid = max(payload.iov[2:], last_valid)
401 table.append([op, payload.name, payload.revision] + list(payload.iov))
404 summary[
'next possible update'] = (last_valid[0] + (1
if last_valid[1] < 0
else 0), last_valid[1] + 1)
406 summary[
'payloads updated'] = len(updated_payloads)
409 support_color = LogPythonInterface.terminal_supports_colors()
411 def color_row(row, _, line):
412 """Color the lines depending on which globaltag the payload comes from"""
413 if not support_color:
415 begin =
"" if row[0] !=
"CLOSE" else "\x1b[31m"
417 return begin + line + end
420 table.sort(key=
lambda x: (x[1], x[3:], x[2], x[0]))
421 table.insert(0, [
"Action",
"Payload",
"Rev",
"First Exp",
"First Run",
"Final Exp",
"Final Run"])
422 columns = [6,
'*', -6, 6, 6, 6, 6]
424 with Pager(f
"Changes to running tag {args.running}:",
True):
425 B2INFO(f
"Changes to be applied to the running tag {args.running}")
426 pretty_print_table(table, columns, transform=color_row)
429 B2INFO(
"Running in dry mode, not applying any changes.", **summary)
432 B2WARNING(
"Applying these changes cannot be undone and further updates to "
433 "this run range will **NOT** be possible", **summary)
435 answer = input(
"Are you sure you want to continue? [yes/No]: ")
436 while answer.lower().strip()
not in [
'yes',
'no',
'n',
'']:
437 answer = input(
"Please enter 'yes' or 'no': ")
439 if answer.lower().strip() !=
'yes':
440 B2INFO(
"Aborted by user ...")
445 updater.apply_update()
448 except RunningTagUpdaterError
as e:
449 B2ERROR(e, **e.extra_vars)