Belle II Software  light-2309-munchkin
cli_management.py
1 
8 import functools
9 import time
10 from concurrent.futures import ThreadPoolExecutor
11 from collections import defaultdict
12 from basf2 import B2INFO, B2ERROR, B2WARNING, LogPythonInterface # noqa
13 from basf2.utils import pretty_print_table
14 from terminal_utils import Pager
15 from conditions_db import set_cdb_authentication_token
16 from conditions_db.iov import IoVSet, IntervalOfValidity
17 from conditions_db.runningupdate import RunningTagUpdater, RunningTagUpdaterError, RunningTagUpdateMode
18 
19 
20 def get_all_iovsets(existing_payloads, run_range=None):
21  """Given a list of PayloadInformation objects, return a reduced list PayloadInformation
22  objects with the single iovs replaced with IoVSets. Payloads with the same
23  name and revision will be merged.
24 
25  Overlaps will raise an B2ERROR
26  """
27  all_payloads = defaultdict(lambda: IoVSet(allow_overlaps=True))
28  by_name = defaultdict(lambda: IoVSet(allow_overlaps=False))
29  infos = {}
30  for payload in existing_payloads:
31  # we want to make set of iovs for each payload
32  iov = IntervalOfValidity(payload.iov)
33  # possibly we have a run range we want to limit to
34  if run_range is not None:
35  iov &= run_range
36  if not iov:
37  continue
38 
39  # merge the iovs for the same revision
40  all_payloads[payload.name, payload.revision].add(iov)
41  # also keep the PayloadInformation
42  infos[payload.name, payload.revision] = payload
43 
44  # and also check if there are any overlaps with the same payload name
45  try:
46  by_name[payload.name].add(iov)
47  except ValueError as e:
48  B2ERROR(f"Overlap for payload {payload.name} r{payload.revision}: {e}")
49 
50  # so now flatten the thing again and return PayloadInformation objects we slightly modify
51  result = []
52  for (name, revision), iov in all_payloads.items():
53  info = infos[name, revision]
54  info.iov = iov
55  info.iov_id = None
56  result.append(info)
57 
58  return result
59 
60 
61 def create_iov_wrapper(db, globaltag_id, payload):
62  """
63  Wrapper function for adding payloads into a given globaltag.
64  """
65  for iov in payload.iov:
66  if db.create_iov(globaltag_id, payload.payload_id, *iov.tuple) is None:
67  raise RuntimeError(f"Cannot create iov for {payload.name} r{payload.revision}")
68 
69 
70 def command_tag_merge(args, db=None):
71  """
72  Merge a list of globaltags in the order they are given.
73 
74  This command allows to merge a number of globaltags into a single globaltag.
75  Payloads from later globaltags in the list of arguments are used to fill gaps
76  present in earlier globaltags.
77 
78  The result is equivalent to having multiple globaltags setup in the conditions
79  access for basf2 (highest priority goes first).
80 
81  Warning:
82  The order of the globaltags is highest priority first, so payloads from
83  globaltags earlier on the command line will be taken with before globaltags
84  from later tags.
85 
86  This command requires that all globaltags are overlap free.
87 
88  For each globaltag in the list we copy all payloads to the output globaltag
89  if there is no payload of that name valid for the given interval of validity
90  in any previous globaltags in the list.
91 
92  If the payload overlaps partially with a payload from a previous globaltag
93  in the list the interval of validity is shortened (and possibly split) to
94  not overlap but to just fill the gaps.
95 
96  For example:
97 
98  globaltag A contains ::
99 
100  payload1, rev 2, valid from 1,0 to 1,10
101  payload1, rev 3, valid from 1,20 to 1,22
102  payload2, rev 1, valid from 1,0 to 1,-1
103 
104  globaltag B contains ::
105 
106  payload1, rev 1, valid from 1,1 to 1,30
107  payload2, rev 2, valid from 0,1 to 1,20
108 
109  Then running ``b2conditionsdb tag merge -o output A B``, the output globaltag
110  after the merge will contain::
111 
112  payload1, rev 2, valid from 1,0 to 1,10
113  payload1, rev 1, valid from 1,11 to 1,19
114  payload1, rev 3, valid from 1,20 to 1,22
115  payload1, rev 1, valid from 1,23 to 1,30
116  payload2, rev 2, valid from 0,1 to 0,-1
117  payload2, rev 1, valid from 1,0 to 1,-1
118 
119  When finished, this command will print a table of payloads and their
120  validity and from which globaltag they were taken. If ``--dry-run`` is given
121  it will only print the list of payloads.
122 
123  Optionally one can specify ``--run-range`` to limit the run range for which
124  the merging should take place. In the example above, running with
125  ``--run-range 1 0 1 21`` the result would be ::
126 
127  payload1, rev 2, valid from 1,0 to 1,10
128  payload1, rev 1, valid from 1,11 to 1,19
129  payload1, rev 3, valid from 1,20 to 1,21
130  payload2, rev 1, valid from 1,0 to 1,21
131 
132  .. versionadded:: release-05-01-00
133  """
134 
135  if db is None:
136  args.add_argument("globaltag", nargs="+", help="name of the globaltag")
137  group = args.add_argument_group("required named arguments")
138  group.add_argument("-o", "--output", required=True, help="Name of the output globaltag")
139  args.add_argument("--dry-run", help="Don't do anything, just print a table with the results",
140  action="store_true", default=False)
141  args.add_argument("--run-range", nargs=4, default=None, type=int,
142  metavar=("FIRST_EXP", "FIRST_RUN", "FINAL_EXP", "FINAL_RUN"),
143  help="Can be for numbers to limit the run range to put"
144  "in the output globaltag: All iovs will be limited to "
145  "be in this range.")
146  args.add_argument("-j", type=int, default=10, dest="nprocess",
147  help="Number of concurrent threads to use for "
148  "creating payloads into the output globaltag.")
149  return
150 
151  if not args.dry_run:
152  set_cdb_authentication_token(db, args.auth_token)
153 
154  # prepare some colors for easy distinction of source tag
155  support_color = LogPythonInterface.terminal_supports_colors()
156  if support_color:
157  colors = "\x1b[32m \x1b[34m \x1b[35m \x1b[31m".split()
158  colors = {tag: color for tag, color in zip(args.globaltag, colors)}
159 
160  def color_row(row, _, line):
161  """Color the lines depending on which globaltag the payload comes from"""
162  if not support_color:
163  return line
164  begin = colors.get(row[-1], "")
165  end = '\x1b[0m'
166  return begin + line + end
167 
168  with Pager("Result of merging globaltags", True):
169  # make sure output tag exists
170  output_id = db.get_globalTagInfo(args.output)
171  if output_id is None:
172  B2ERROR("Output globaltag doesn't exist. Please create it first with a proper description")
173  return False
174 
175  output_id = output_id["globalTagId"]
176 
177  # check all globaltags exist
178  if any(db.get_globalTagInfo(tag) is None for tag in args.globaltag):
179  return False
180 
181  final = []
182  table = []
183  existing = defaultdict(lambda: IoVSet(allow_overlaps=True))
184  if args.run_range is not None:
185  args.run_range = IntervalOfValidity(args.run_range)
186 
187  # For each globaltag
188  for tag in args.globaltag:
189  # get all the payloads and iovs from the globaltag
190  all_payloads = db.get_all_iovs(tag)
191  # sort all the payloads by revision number (reversed sort: highest revisions first)
192  all_payloads.sort(key=lambda p: p.revision, reverse=True)
193  # and sort again but this time by name: not really necessary,
194  # but it helps printing the log messages ordered by payloads name
195  all_payloads.sort(key=lambda p: p.name, reverse=False)
196  # get all payload information objects with their iovs already merged to iovset instances
197  payloads = get_all_iovsets(all_payloads, args.run_range)
198  for payload in payloads:
199  # make sure it doesn't overlap with any of the previous
200  payload.iov.remove(existing[payload.name])
201  # but if there's something left
202  if payload.iov:
203  # extend the known coverage of this payload
204  existing[payload.name] |= payload.iov
205  # extend this payload to the list to create later
206  final.append(payload)
207  # and add all separate iovs to the table to show the user
208  for iov in payload.iov:
209  table.append([payload.name, payload.revision] + list(iov.tuple) + [tag])
210 
211  # sort the table by payload name and start run ... we want to display it
212  table.sort(key=lambda r: (r[0], r[3:5]))
213 
214  # and fancy print it ...
215  table.insert(0, ["Name", "Rev", "First Exp", "First Run", "Final Exp", "Final Run", "Source"])
216  columns = ["+", -8, 6, 6, 6, 6, max(len(_) for _ in args.globaltag)]
217 
218  B2INFO(f"Result of merging the globaltags {', '.join(args.globaltag)}")
219 
220  pretty_print_table(table, columns, transform=color_row)
221 
222  # Ok, we're still alive, create all the payloads using multiple processes.
223  if not args.dry_run:
224  B2INFO(f'Now copying the {len(final)} payloads into {args.output} to create {len(table)-1} iovs ...')
225  create_iov = functools.partial(create_iov_wrapper, db, output_id)
226  try:
227  with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
228  start = time.monotonic()
229  for payload, _ in enumerate(pool.map(create_iov, final), 1):
230  eta = (time.monotonic() - start) / payload * (len(final) - payload)
231  B2INFO(f"{payload}/{len(final)} payloads copied, ETA: {eta:.1f} seconds")
232  except RuntimeError:
233  B2ERROR("Not all iovs could be created. This could be a server/network problem "
234  "or the destination globaltag was not empty or not writeable. Please make "
235  "sure the target tag is empty and try again")
236  return 1
237 
238  return 0
239 
240 
241 def command_tag_runningupdate(args, db=None):
242  """
243  Update a running globaltag with payloads from a staging tag
244 
245  This command will calculate and apply the necessary updates to a running
246  globaltag with a given staging globaltag
247 
248  Running tags are defined as "immutable for existing data but conditions for
249  newer runs may be added" and the only modification allowed is to add new
250  payloads for new runs or close existing payloads to no longer be valid for
251  new runs.
252 
253  This command takes previously prepared and validated payloads in a staging
254  globaltag and will then calculate which payloads to close and what to add to
255  the running globaltag.
256 
257  For this to work we require
258 
259  1. A running globaltag in the state "RUNNING"
260 
261  2. A (experiment, run) number from which run on the update should be valid.
262  This run number needs to be
263 
264  a) bigger than the start of validity for all iovs in the running tag
265  b) bigger than the end of validity for all closed iovs (i.e. not valid
266  to infinity) in the running tag
267 
268  3. A staging globaltag with the new payloads in state "VALIDATED"
269 
270  a) payloads in the staging tag starting at (0,0) will be interpreted as
271  starting at the first valid run for the update
272  b) all other payloads need to start at or after the first valid run for
273  the update.
274  c) The globaltag needs to be gap and overlap free
275  d) All payloads in the staging tag should have as last iov an open iov
276  (i.e. valid to infinity) but this can be disabled.
277 
278  The script will check all the above requirements and will then calculate the
279  necessary operations to
280 
281  1. Add all payloads from the staging tag where a start validity of (0, 0) is
282  replaced by the starting run for which this update should be valid.
283  2. close all iovs for payloads in the running tags just before the
284  corresponding iov of the same payload in the staging tag, so either at the
285  first run for the update to be valid or later
286  3. Optionally, make sure all payloads in the staging tag end in an open iov.
287 
288  Example:
289 
290  running tag contains ::
291 
292  payload1, rev 1, valid from 0,1 to 1,0
293  payload1, rev 2, valid from 1,1 to -1,-1
294  payload2, rev 1, valid from 0,1 to -1,-1
295  payload3, rev 1, valid from 0,1 to 1,0
296  payload4, rev 1, valid from 0,1 to -1,-1
297  payload5, rev 1, valid from 0,1 to -1,-1
298 
299  staging tag contains ::
300 
301  payload1, rev 3, valid from 0,0 to 1,8
302  payload1, rev 4, valid from 1,9 to 1,20
303  payload2, rev 2, valid from 1,5 to 1,20
304  payload3, rev 2, valid from 0,0 to -1,-1
305  payload4, rev 1, valid from 0,0 to 1,20
306 
307  Then running ``b2conditionsdb tag runningupdate running staging --run 1 2 --allow-closed``,
308  the running globaltag after the update will contain ::
309 
310  payload1, rev 1, valid from 0,1 to 1,0
311  payload1, rev 2, valid from 1,1 to 1,1
312  payload1, rev 3, valid from 1,2 to 1,8
313  payload1, rev 4, valid from 1,9 to 1,20
314  payload2, rev 1, valid from 0,1 to 1,4
315  payload2, rev 2, valid from 1,5 to 1,20
316  payload3, rev 1, valid from 0,1 to 1,0
317  payload3, rev 2, valid from 1,2 to -1,-1
318  payload4, rev 1, valid from 0,1 to 1,20
319  payload5, rev 1, valid from 0,1 to -1,-1
320 
321  Note that
322 
323  - the start of payload1 and payload3 in staging has been adjusted
324  - payload2 in the running tag as been closed at 1,4, just before the
325  validity from the staging tag
326  - payload3 was already closed in the running tag so no change is
327  performed. This might result in gaps but is intentional
328  - payload4 was not closed at rim 1,2 because the staging tag had the same
329  revision of the payload so the these were merged to one long validity.
330  - payload5 was not closed as there was no update to it in the staging tag.
331  If we would have run with ``--full-replacement`` it would have been closed.
332  - if we would have chosen ``--run 1 1`` the update would have failed because
333  payload1, rev2 in running starts at 1,1 so we would have a conflict
334  - if we would have chosen ``--run 1 6`` the update would have failed because
335  payload2 in the staging tag starts before this run
336  - if we would have chosen to open the final iovs in staging by using
337  ``--fix-closed``, payload1, rev 4; payload2, rev 2 and payload4 rev 1
338  would be valid until -1,-1 after the running tag. In fact, payload 4
339  would not be changed at all.
340  """
341  if db is None:
342  args.add_argument("running", help="name of the running globaltag")
343  args.add_argument("staging", help="name of the staging globaltag")
344  group = args.add_argument_group("required named arguments")
345  group.add_argument("-r", "--run", required=True, nargs=2, type=int, metavar=("EXP", "RUN"),
346  help="First experiment + run number for which the update should be "
347  "valid. Two numbers separated by space")
348  choice = args.add_mutually_exclusive_group()
349  choice.add_argument("--allow-closed", dest="mode", action="store_const",
350  const=RunningTagUpdateMode.ALLOW_CLOSED,
351  default=RunningTagUpdateMode.STRICT,
352  help="if given allow payloads in the staging tag to not "
353  "be open, i.e. they don't have to be open ended in the "
354  "update. Useful to retire a payload by adding one last update")
355  choice.add_argument("--fix-closed", dest="mode", action="store_const",
356  const=RunningTagUpdateMode.FIX_CLOSED,
357  help="if given automatically open the last iov for each "
358  "payload in staging if it is closed.")
359  choice.add_argument("--simple-mode", dest="mode", action="store_const",
360  const=RunningTagUpdateMode.SIMPLE,
361  help="if given require the staging tag to solely consist "
362  "of fully infinite validities: Only one iov per payload "
363  "with a validity of (0,0,-1,-1)")
364  choice.add_argument("--full-replacement", dest="mode", action="store_const",
365  const=RunningTagUpdateMode.FULL_REPLACEMENT,
366  help="if given perform a full replacement and close all "
367  "open iovs in the running tag not present in the staging tag. "
368  "After such an update exactly the payloads in the staging tag "
369  "will be valid after the given run. This allows for closed iovs "
370  "in the staging tag as with ``--allow-closed``")
371  args.add_argument("--dry-run", default=False, action="store_true",
372  help="Only show the changes, don't try to apply them")
373  return
374 
375  if not args.dry_run:
376  set_cdb_authentication_token(db, args.auth_token)
377 
378  try:
379  updater = RunningTagUpdater(db, args.running, args.staging, args.run, args.mode, args.dry_run)
380  operations = updater.calculate_update()
381  except RunningTagUpdaterError as e:
382  B2ERROR(e, **e.extra_vars)
383  return 1
384 
385  # make sure we exit if we have nothing to do
386  if not operations:
387  B2INFO("Nothing to do, please check the globaltags given are correct")
388  return 1
389 
390  # show operations in a table and some summary
391  table = []
392  last_valid = tuple(args.run)
393  summary = {
394  "first valid run": last_valid,
395  "payloads closed": 0,
396  "payloads updated": 0,
397  "payload iovs added": 0,
398  "next possible update": last_valid,
399  }
400 
401  updated_payloads = set()
402  for op, payload in operations:
403  # calculate how many payloads/iovs will be closed/added
404  if op == "CLOSE":
405  summary['payloads closed'] += 1
406  else:
407  updated_payloads.add(payload.name)
408  summary['payload iovs added'] += 1
409  # remember the highest run number of any iov, so first run
410  last_valid = max(payload.iov[:2], last_valid)
411  # and final run if not open
412  if payload.iov[2:] != (-1, -1):
413  last_valid = max(payload.iov[2:], last_valid)
414  # and add to the table of operations to be shown
415  table.append([op, payload.name, payload.revision] + list(payload.iov))
416 
417  # calculate the next fee run
418  summary['next possible update'] = (last_valid[0] + (1 if last_valid[1] < 0 else 0), last_valid[1] + 1)
419  # and the number of distinct payloads
420  summary['payloads updated'] = len(updated_payloads)
421 
422  # prepare some colors for easy distinction of closed payloads
423  support_color = LogPythonInterface.terminal_supports_colors()
424 
425  def color_row(row, _, line):
426  """Color the lines depending on which globaltag the payload comes from"""
427  if not support_color:
428  return line
429  begin = "" if row[0] != "CLOSE" else "\x1b[31m"
430  end = '\x1b[0m'
431  return begin + line + end
432 
433  # and then show the table
434  table.sort(key=lambda x: (x[1], x[3:], x[2], x[0]))
435  table.insert(0, ["Action", "Payload", "Rev", "First Exp", "First Run", "Final Exp", "Final Run"])
436  columns = [6, '*', -6, 6, 6, 6, 6]
437 
438  with Pager(f"Changes to running tag {args.running}:", True):
439  B2INFO(f"Changes to be applied to the running tag {args.running}")
440  pretty_print_table(table, columns, transform=color_row)
441 
442  if args.dry_run:
443  B2INFO("Running in dry mode, not applying any changes.", **summary)
444  return 0
445 
446  B2WARNING("Applying these changes cannot be undone and further updates to "
447  "this run range will **NOT** be possible", **summary)
448  # ask if the user really knows what they're doing
449  answer = input("Are you sure you want to continue? [yes/No]: ")
450  while answer.lower().strip() not in ['yes', 'no', 'n', '']:
451  answer = input("Please enter 'yes' or 'no': ")
452 
453  if answer.lower().strip() != 'yes':
454  B2INFO("Aborted by user ...")
455  return 1
456 
457  # Ok, all set ... apply the update
458  try:
459  updater.apply_update()
460  B2INFO("done")
461  return 0
462  except RunningTagUpdaterError as e:
463  B2ERROR(e, **e.extra_vars)
464  return 1