10 from concurrent.futures
import ThreadPoolExecutor
11 from collections
import defaultdict
12 from basf2
import B2INFO, B2ERROR, B2WARNING, LogPythonInterface
14 from terminal_utils
import Pager
19 def get_all_iovsets(existing_payloads, run_range=None):
20 """Given a list of PayloadInformation objects, return a reduced list PayloadInformation
21 objects with the single iovs replaced with IoVSets. Payloads with the same
22 name and revision will be merged.
24 Overlaps will raise an B2ERROR
26 all_payloads = defaultdict(
lambda: IoVSet(allow_overlaps=
True))
27 by_name = defaultdict(
lambda: IoVSet(allow_overlaps=
False))
29 for payload
in existing_payloads:
31 iov = IntervalOfValidity(payload.iov)
33 if run_range
is not None:
39 all_payloads[payload.name, payload.revision].add(iov)
41 infos[payload.name, payload.revision] = payload
45 by_name[payload.name].add(iov)
46 except ValueError
as e:
47 B2ERROR(f
"Overlap for payload {payload.name} r{payload.revision}: {e}")
51 for (name, revision), iov
in all_payloads.items():
52 info = infos[name, revision]
60 def create_iov_wrapper(db, globaltag_id, payload):
62 Wrapper function for adding payloads into a given globaltag.
64 for iov
in payload.iov:
65 if db.create_iov(globaltag_id, payload.payload_id, *iov.tuple)
is None:
66 raise RuntimeError(f
"Cannot create iov for {payload.name} r{payload.revision}")
69 def command_tag_merge(args, db=None):
71 Merge a list of globaltags in the order they are given.
73 This command allows to merge a number of globaltags into a single globaltag.
74 Payloads from later globaltags in the list of arguments are used to fill gaps
75 present in earlier globaltags.
77 The result is equivalent to having multiple globaltags setup in the conditions
78 access for basf2 (highest priority goes first).
81 The order of the globaltags is highest priority first, so payloads from
82 globaltags earlier on the command line will be taken with before globaltags
85 This command requires that all globaltags are overlap free.
87 For each globaltag in the list we copy all payloads to the output globaltag
88 if there is no payload of that name valid for the given interval of validity
89 in any previous globaltags in the list.
91 If the payload overlaps partially with a payload from a previous globaltag
92 in the list the interval of validity is shortened (and possibly split) to
93 not overlap but to just fill the gaps.
97 globaltag A contains ::
99 payload1, rev 2, valid from 1,0 to 1,10
100 payload1, rev 3, valid from 1,20 to 1,22
101 payload2, rev 1, valid from 1,0 to 1,-1
103 globaltag B contains ::
105 payload1, rev 1, valid from 1,1 to 1,30
106 payload2, rev 2, valid from 0,1 to 1,20
108 Then running ``b2conditionsdb tag merge -o output A B``, the output globaltag
109 after the merge will contain::
111 payload1, rev 2, valid from 1,0 to 1,10
112 payload1, rev 1, valid from 1,11 to 1,19
113 payload1, rev 3, valid from 1,20 to 1,22
114 payload1, rev 1, valid from 1,23 to 1,30
115 payload2, rev 2, valid from 0,1 to 0,-1
116 payload2, rev 1, valid from 1,0 to 1,-1
118 When finished, this command will print a table of payloads and their
119 validity and from which globaltag they were taken. If ``--dry-run`` is given
120 it will only print the list of payloads.
122 Optionally one can specify ``--run-range`` to limit the run range for which
123 the merging should take place. In the example above, running with
124 ``--run-range 1 0 1 21`` the result would be ::
126 payload1, rev 2, valid from 1,0 to 1,10
127 payload1, rev 1, valid from 1,11 to 1,19
128 payload1, rev 3, valid from 1,20 to 1,21
129 payload2, rev 1, valid from 1,0 to 1,21
131 .. versionadded:: release-05-01-00
135 args.add_argument(
"globaltag", nargs=
"+", help=
"name of the globaltag")
136 group = args.add_argument_group(
"required named arguments")
137 group.add_argument(
"-o",
"--output", required=
True, help=
"Name of the output globaltag")
138 args.add_argument(
"--dry-run", help=
"Don't do anything, just print a table with the results",
139 action=
"store_true", default=
False)
140 args.add_argument(
"--run-range", nargs=4, default=
None, type=int,
141 metavar=(
"FIRST_EXP",
"FIRST_RUN",
"FINAL_EXP",
"FINAL_RUN"),
142 help=
"Can be for numbers to limit the run range to put"
143 "in the output globaltag: All iovs will be limited to "
145 args.add_argument(
"-j", type=int, default=10, dest=
"nprocess",
146 help=
"Number of concurrent threads to use for "
147 "creating payloads into the output globaltag.")
151 support_color = LogPythonInterface.terminal_supports_colors()
153 colors =
"\x1b[32m \x1b[34m \x1b[35m \x1b[31m".split()
154 colors = {tag: color
for tag, color
in zip(args.globaltag, colors)}
156 def color_row(row, _, line):
157 """Color the lines depending on which globaltag the payload comes from"""
158 if not support_color:
160 begin = colors.get(row[-1],
"")
162 return begin + line + end
164 with Pager(
"Result of merging globaltags",
True):
166 output_id = db.get_globalTagInfo(args.output)
167 if output_id
is None:
168 B2ERROR(
"Output globaltag doesn't exist. Please create it first with a proper description")
171 output_id = output_id[
"globalTagId"]
174 if any(db.get_globalTagInfo(tag)
is None for tag
in args.globaltag):
179 existing = defaultdict(
lambda: IoVSet(allow_overlaps=
True))
180 if args.run_range
is not None:
181 args.run_range = IntervalOfValidity(args.run_range)
184 for tag
in args.globaltag:
186 all_payloads = db.get_all_iovs(tag)
188 all_payloads.sort(key=
lambda p: p.revision, reverse=
True)
191 all_payloads.sort(key=
lambda p: p.name, reverse=
False)
193 payloads = get_all_iovsets(all_payloads, args.run_range)
194 for payload
in payloads:
196 payload.iov.remove(existing[payload.name])
200 existing[payload.name] |= payload.iov
202 final.append(payload)
204 for iov
in payload.iov:
205 table.append([payload.name, payload.revision] + list(iov.tuple) + [tag])
208 table.sort(key=
lambda r: (r[0], r[3:5]))
211 table.insert(0, [
"Name",
"Rev",
"First Exp",
"First Run",
"Final Exp",
"Final Run",
"Source"])
212 columns = [
"+", -8, 6, 6, 6, 6, max(len(_)
for _
in args.globaltag)]
214 B2INFO(f
"Result of merging the globaltags {', '.join(args.globaltag)}")
216 pretty_print_table(table, columns, transform=color_row)
220 B2INFO(f
'Now copying the {len(final)} payloads into {args.output} to create {len(table)-1} iovs ...')
221 create_iov = functools.partial(create_iov_wrapper, db, output_id)
223 with ThreadPoolExecutor(max_workers=args.nprocess)
as pool:
224 start = time.monotonic()
225 for payload, _
in enumerate(pool.map(create_iov, final), 1):
226 eta = (time.monotonic() - start) / payload * (len(final) - payload)
227 B2INFO(f
"{payload}/{len(final)} payloads copied, ETA: {eta:.1f} seconds")
229 B2ERROR(
"Not all iovs could be created. This could be a server/network problem "
230 "or the destination globaltag was not empty or not writeable. Please make "
231 "sure the target tag is empty and try again")
237 def command_tag_runningupdate(args, db=None):
239 Update a running globaltag with payloads from a staging tag
241 This command will calculate and apply the necessary updates to a running
242 globaltag with a given staging globaltag
244 Running tags are defined as "immutable for existing data but conditions for
245 newer runs may be added" and the only modification allowed is to add new
246 payloads for new runs or close existing payloads to no longer be valid for
249 This command takes previously prepared and validated payloads in a staging
250 globaltag and will then calculate which payloads to close and what to add to
251 the running globaltag.
253 For this to work we require
255 1. A running globaltag in the state "RUNNING"
257 2. A (experiment, run) number from which run on the update should be valid.
258 This run number needs to be
260 a) bigger than the start of validity for all iovs in the running tag
261 b) bigger than the end of validity for all closed iovs (i.e. not valid
262 to infinity) in the running tag
264 3. A staging globaltag with the new payloads in state "VALIDATED"
266 a) payloads in the staging tag starting at (0,0) will be interpreted as
267 starting at the first valid run for the update
268 b) all other payloads need to start at or after the first valid run for
270 c) The globaltag needs to be gap and overlap free
271 d) All payloads in the staging tag should have as last iov an open iov
272 (i.e. valid to infinity) but this can be disabled.
274 The script will check all the above requirements and will then calculate the
275 necessary operations to
277 1. Add all payloads from the staging tag where a start validity of (0, 0) is
278 replaced by the starting run for which this update should be valid.
279 2. close all iovs for payloads in the running tags just before the
280 corresponding iov of the same payload in the staging tag, so either at the
281 first run for the update to be valid or later
282 3. Optionally, make sure all payloads in the staging tag end in an open iov.
286 running tag contains ::
288 payload1, rev 1, valid from 0,1 to 1,0
289 payload1, rev 2, valid from 1,1 to -1,-1
290 payload2, rev 1, valid from 0,1 to -1,-1
291 payload3, rev 1, valid from 0,1 to 1,0
292 payload4, rev 1, valid from 0,1 to -1,-1
293 payload5, rev 1, valid from 0,1 to -1,-1
295 staging tag contains ::
297 payload1, rev 3, valid from 0,0 to 1,8
298 payload1, rev 4, valid from 1,9 to 1,20
299 payload2, rev 2, valid from 1,5 to 1,20
300 payload3, rev 2, valid from 0,0 to -1,-1
301 payload4, rev 1, valid from 0,0 to 1,20
303 Then running ``b2conditionsdb tag runningupdate running staging --run 1 2 --allow-closed``,
304 the running globaltag after the update will contain ::
306 payload1, rev 1, valid from 0,1 to 1,0
307 payload1, rev 2, valid from 1,1 to 1,1
308 payload1, rev 3, valid from 1,2 to 1,8
309 payload1, rev 4, valid from 1,9 to 1,20
310 payload2, rev 1, valid from 0,1 to 1,4
311 payload2, rev 2, valid from 1,5 to 1,20
312 payload3, rev 1, valid from 0,1 to 1,0
313 payload3, rev 2, valid from 1,2 to -1,-1
314 payload4, rev 1, valid from 0,1 to 1,20
315 payload5, rev 1, valid from 0,1 to -1,-1
319 - the start of payload1 and payload3 in staging has been adjusted
320 - payload2 in the running tag as been closed at 1,4, just before the
321 validity from the staging tag
322 - payload3 was already closed in the running tag so no change is
323 performed. This might result in gaps but is intentional
324 - payload4 was not closed at rim 1,2 because the staging tag had the same
325 revision of the payload so the these were merged to one long validity.
326 - payload5 was not closed as there was no update to it in the staging tag.
327 If we would have run with ``--full-replacement`` it would have been closed.
328 - if we would have chosen ``--run 1 1`` the update would have failed because
329 payload1, rev2 in running starts at 1,1 so we would have a conflict
330 - if we would have chosen ``--run 1 6`` the update would have failed because
331 payload2 in the staging tag starts before this run
332 - if we would have chosen to open the final iovs in staging by using
333 ``--fix-closed``, payload1, rev 4; payload2, rev 2 and payload4 rev 1
334 would be valid until -1,-1 after the running tag. In fact, payload 4
335 would not be changed at all.
338 args.add_argument(
"running", help=
"name of the running globaltag")
339 args.add_argument(
"staging", help=
"name of the staging globaltag")
340 group = args.add_argument_group(
"required named arguments")
341 group.add_argument(
"-r",
"--run", required=
True, nargs=2, type=int, metavar=(
"EXP",
"RUN"),
342 help=
"First experiment + run number for which the update should be "
343 "valid. Two numbers separated by space")
344 choice = args.add_mutually_exclusive_group()
345 choice.add_argument(
"--allow-closed", dest=
"mode", action=
"store_const",
346 const=RunningTagUpdateMode.ALLOW_CLOSED,
347 default=RunningTagUpdateMode.STRICT,
348 help=
"if given allow payloads in the staging tag to not "
349 "be open, i.e. they don't have to be open ended in the "
350 "update. Useful to retire a payload by adding one last update")
351 choice.add_argument(
"--fix-closed", dest=
"mode", action=
"store_const",
352 const=RunningTagUpdateMode.FIX_CLOSED,
353 help=
"if given automatically open the last iov for each "
354 "payload in staging if it is closed.")
355 choice.add_argument(
"--simple-mode", dest=
"mode", action=
"store_const",
356 const=RunningTagUpdateMode.SIMPLE,
357 help=
"if given require the staging tag to solely consist "
358 "of fully infinite validities: Only one iov per payload "
359 "with a validity of (0,0,-1,-1)")
360 choice.add_argument(
"--full-replacement", dest=
"mode", action=
"store_const",
361 const=RunningTagUpdateMode.FULL_REPLACEMENT,
362 help=
"if given perform a full replacement and close all "
363 "open iovs in the running tag not present in the staging tag. "
364 "After such an update exactly the payloads in the staging tag "
365 "will be valid after the given run. This allows for closed iovs "
366 "in the staging tag as with ``--allow-closed``")
367 args.add_argument(
"--dry-run", default=
False, action=
"store_true",
368 help=
"Only show the changes, don't try to apply them")
372 updater = RunningTagUpdater(db, args.running, args.staging, args.run, args.mode, args.dry_run)
373 operations = updater.calculate_update()
374 except RunningTagUpdaterError
as e:
375 B2ERROR(e, **e.extra_vars)
380 B2INFO(
"Nothing to do, please check the globaltags given are correct")
385 last_valid = tuple(args.run)
387 "first valid run": last_valid,
388 "payloads closed": 0,
389 "payloads updated": 0,
390 "payload iovs added": 0,
391 "next possible update": last_valid,
394 updated_payloads = set()
395 for op, payload
in operations:
398 summary[
'payloads closed'] += 1
400 updated_payloads.add(payload.name)
401 summary[
'payload iovs added'] += 1
403 last_valid = max(payload.iov[:2], last_valid)
405 if payload.iov[2:] != (-1, -1):
406 last_valid = max(payload.iov[2:], last_valid)
408 table.append([op, payload.name, payload.revision] + list(payload.iov))
411 summary[
'next possible update'] = (last_valid[0] + (1
if last_valid[1] < 0
else 0), last_valid[1] + 1)
413 summary[
'payloads updated'] = len(updated_payloads)
416 support_color = LogPythonInterface.terminal_supports_colors()
418 def color_row(row, _, line):
419 """Color the lines depending on which globaltag the payload comes from"""
420 if not support_color:
422 begin =
"" if row[0] !=
"CLOSE" else "\x1b[31m"
424 return begin + line + end
427 table.sort(key=
lambda x: (x[1], x[3:], x[2], x[0]))
428 table.insert(0, [
"Action",
"Payload",
"Rev",
"First Exp",
"First Run",
"Final Exp",
"Final Run"])
429 columns = [6,
'*', -6, 6, 6, 6, 6]
431 with Pager(f
"Changes to running tag {args.running}:",
True):
432 B2INFO(f
"Changes to be applied to the running tag {args.running}")
433 pretty_print_table(table, columns, transform=color_row)
436 B2INFO(
"Running in dry mode, not applying any changes.", **summary)
439 B2WARNING(
"Applying these changes cannot be undone and further updates to "
440 "this run range will **NOT** be possible", **summary)
442 answer = input(
"Are you sure you want to continue? [yes/No]: ")
443 while answer.lower().strip()
not in [
'yes',
'no',
'n',
'']:
444 answer = input(
"Please enter 'yes' or 'no': ")
446 if answer.lower().strip() !=
'yes':
447 B2INFO(
"Aborted by user ...")
452 updater.apply_update()
455 except RunningTagUpdaterError
as e:
456 B2ERROR(e, **e.extra_vars)