Belle II Software  release-05-01-25
cli_management.py
1 import functools
2 import time
3 from concurrent.futures import ThreadPoolExecutor
4 from collections import defaultdict
5 from basf2 import B2INFO, B2ERROR, B2WARNING, LogPythonInterface # noqa
6 from basf2.utils import pretty_print_table
7 from terminal_utils import Pager
8 from conditions_db.iov import IoVSet, IntervalOfValidity
9 from conditions_db.runningupdate import RunningTagUpdater, RunningTagUpdaterError, RunningTagUpdateMode
10 
11 
12 def get_all_iovsets(existing_payloads, run_range=None):
13  """Given a list of PayloadInformation objects, return a reduced list PayloadInformation
14  objects with the single iovs replaced with IoVSets. Payloads with the same
15  name and revision will be merged.
16 
17  Overlaps will raise an B2ERROR
18  """
19  all_payloads = defaultdict(lambda: IoVSet(allow_overlaps=True))
20  by_name = defaultdict(lambda: IoVSet(allow_overlaps=False))
21  infos = {}
22  for payload in existing_payloads:
23  # we want to make set of iovs for each payload
24  iov = IntervalOfValidity(payload.iov)
25  # possibly we have a run range we want to limit to
26  if run_range is not None:
27  iov &= run_range
28  if not iov:
29  continue
30 
31  # merge the iovs for the same revision
32  all_payloads[payload.name, payload.revision].add(iov)
33  # also keep the PayloadInformation
34  infos[payload.name, payload.revision] = payload
35 
36  # and also check if there are any overlaps with the same payload name
37  try:
38  by_name[payload.name].add(iov)
39  except ValueError as e:
40  B2ERROR(f"Overlap for payload {payload.name}: {e}")
41 
42  # so now flatten the thing again and return PayloadInformation objects we slightly modify
43  result = []
44  for (name, revision), iov in all_payloads.items():
45  info = infos[name, revision]
46  info.iov = iov
47  info.iov_id = None
48  result.append(info)
49 
50  return result
51 
52 
53 def create_iov_wrapper(db, globaltag_id, payload):
54  """
55  Wrapper function for adding payloads into a given globaltag.
56  """
57  for iov in payload.iov:
58  if db.create_iov(globaltag_id, payload.payload_id, *iov.tuple) is None:
59  raise RuntimeError(f"Cannot create iov for {payload.name} r{payload.revision}")
60 
61 
62 def command_tag_merge(args, db=None):
63  """
64  Merge a list of globaltags in the order they are given.
65 
66  This command allows to merge a number of globaltags into a single globaltag.
67  Payloads from later globaltags in the list of arguments are used to fill gaps
68  present in earlier globaltags.
69 
70  The result is equivalent to having multiple globaltags setup in the conditions
71  access for basf2 (highest priority goes first).
72 
73  Warning:
74  The order of the globaltags is highest priority first, so payloads from
75  globaltags earlier on the command line will be taken with before globaltags
76  from later tags.
77 
78  This command requires that all globaltags are overlap free.
79 
80  For each globaltag in the list we copy all payloads to the output globaltag
81  if there is no payload of that name valid for the given interval of validity
82  in any previous globaltags in the list.
83 
84  If the payload overlaps partially with a payload from a previous globaltag
85  in the list the interval of validity is shortened (and possibly split) to
86  not overlap but to just fill the gaps.
87 
88  For example:
89 
90  globaltag A contains ::
91 
92  payload1, rev 2, valid from 1,0 to 1,10
93  payload1, rev 3, valid from 1,20 to 1,22
94  payload2, rev 1, valid from 1,0 to 1,-1
95 
96  globaltag B contains ::
97 
98  payload1, rev 1, valid from 1,1 to 1,30
99  payload2, rev 2, valid from 0,1 to 1,20
100 
101  Then running ``b2conditionsdb tag merge -o output A B``, the output globaltag
102  after the merge will contain::
103 
104  payload1, rev 2, valid from 1,0 to 1,10
105  payload1, rev 1, valid from 1,11 to 1,19
106  payload1, rev 3, valid from 1,20 to 1,22
107  payload1, rev 1, valid from 1,23 to 1,30
108  payload2, rev 2, valid from 0,1 to 0,-1
109  payload2, rev 1, valid from 1,0 to 1,-1
110 
111  When finished, this command will print a table of payloads and their
112  validity and from which globaltag they were taken. If ``--dry-run`` is given
113  it will only print the list of payloads.
114 
115  Optionally one can specify ``--run-range`` to limit the run range for which
116  the merging should take place. In the example above, running with
117  ``--run-range 1 0 1 21`` the result would be ::
118 
119  payload1, rev 2, valid from 1,0 to 1,10
120  payload1, rev 1, valid from 1,11 to 1,19
121  payload1, rev 3, valid from 1,20 to 1,21
122  payload2, rev 1, valid from 1,0 to 1,21
123 
124  .. versionadded:: release-05-01-00
125  """
126 
127  if db is None:
128  args.add_argument("globaltag", nargs="+", help="name of the globaltag")
129  group = args.add_argument_group("required named arguments")
130  group.add_argument("-o", "--output", required=True, help="Name of the output globaltag")
131  args.add_argument("--dry-run", help="Don't do anything, just print a table with the results",
132  action="store_true", default=False)
133  args.add_argument("--run-range", nargs=4, default=None, type=int,
134  metavar=("FIRST_EXP", "FIRST_RUN", "FINAL_EXP", "FINAL_RUN"),
135  help="Can be for numbers to limit the run range to put"
136  "in the output globaltag: All iovs will be limited to "
137  "be in this range.")
138  args.add_argument("-j", type=int, default=10, dest="nprocess",
139  help="Number of concurrent threads to use for "
140  "creating payloads into the output globaltag.")
141  return
142 
143  # prepare some colors for easy distinction of source tag
144  support_color = LogPythonInterface.terminal_supports_colors()
145  if support_color:
146  colors = "\x1b[32m \x1b[34m \x1b[35m \x1b[31m".split()
147  colors = {tag: color for tag, color in zip(args.globaltag, colors)}
148 
149  def color_row(row, _, line):
150  """Color the lines depending on which globaltag the payload comes from"""
151  if not support_color:
152  return line
153  begin = colors.get(row[-1], "")
154  end = '\x1b[0m'
155  return begin + line + end
156 
157  with Pager("Result of merging globaltags", True):
158  # make sure output tag exists
159  output_id = db.get_globalTagInfo(args.output)
160  if output_id is None:
161  B2ERROR("Output globaltag doesn't exist. Please create it first with a proper description")
162  return False
163 
164  output_id = output_id["globalTagId"]
165 
166  # check all globaltags exist
167  if any(db.get_globalTagInfo(tag) is None for tag in args.globaltag):
168  return False
169 
170  final = []
171  table = []
172  existing = defaultdict(lambda: IoVSet(allow_overlaps=True))
173  if args.run_range is not None:
174  args.run_range = IntervalOfValidity(args.run_range)
175 
176  # For each globaltag
177  for tag in args.globaltag:
178  # get all payload information objects with their iovs already merged to iovset instances
179  payloads = get_all_iovsets(db.get_all_iovs(tag), args.run_range)
180  for payload in payloads:
181  # make sure it doesn't overlap with any of the previous
182  payload.iov.remove(existing[payload.name])
183  # but if there's something left
184  if payload.iov:
185  # extend the known coverage of this payload
186  existing[payload.name] |= payload.iov
187  # extend this payload to the list to create later
188  final.append(payload)
189  # and add all separate iovs to the table to show the user
190  for iov in payload.iov:
191  table.append([payload.name, payload.revision] + list(iov.tuple) + [tag])
192 
193  # sort the table by payload name and start run ... we want to display it
194  table.sort(key=lambda r: (r[0], r[3:5]))
195 
196  # and fancy print it ...
197  table.insert(0, ["Name", "Rev", "First Exp", "First Run", "Final Exp", "Final Run", "Source"])
198  columns = ["+", -8, 6, 6, 6, 6, max(len(_) for _ in args.globaltag)]
199 
200  B2INFO(f"Result of merging the globaltags {', '.join(args.globaltag)}")
201 
202  pretty_print_table(table, columns, transform=color_row)
203 
204  # Ok, we're still alive, create all the payloads using multiple processes.
205  if not args.dry_run:
206  B2INFO(f'Now copying the {len(final)} payloads into {args.output} to create {len(table)-1} iovs ...')
207  create_iov = functools.partial(create_iov_wrapper, db, output_id)
208  try:
209  with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
210  start = time.monotonic()
211  for payload, _ in enumerate(pool.map(create_iov, final), 1):
212  eta = (time.monotonic() - start) / payload * (len(final) - payload)
213  B2INFO(f"{payload}/{len(final)} payloads copied, ETA: {eta:.1f} seconds")
214  except RuntimeError:
215  B2ERROR("Not all iovs could be created. This could be a server/network problem "
216  "or the destination globaltag was not empty or not writeable. Please make "
217  "sure the target tag is empty and try again")
218  return 1
219 
220  return 0
221 
222 
223 def command_tag_runningupdate(args, db=None):
224  """
225  Update a running globaltag with payloads from a staging tag
226 
227  This command will calculate and apply the necessary updates to a running
228  globaltag with a given staging globaltag
229 
230  Running tags are defined as "immutable for existing data but conditions for
231  newer runs may be added" and the only modification allowed is to add new
232  payloads for new runs or close existing payloads to no longer be valid for
233  new runs.
234 
235  This command takes previously prepared and validated payloads in a staging
236  globaltag and will then calculate which payloads to close and what to add to
237  the running globaltag.
238 
239  For this to work we require
240 
241  1. A running globaltag in the state "RUNNING"
242 
243  2. A (experiment, run) number from which run on the update should be valid.
244  This run number needs to be
245 
246  a) bigger than the start of validity for all iovs in the running tag
247  b) bigger than the end of validity for all closed iovs (i.e. not valid
248  to infinity) in the running tag
249 
250  3. A staging globaltag with the new payloads in state "VALIDATED"
251 
252  a) payloads in the staging tag starting at (0,0) will be interpreted as
253  starting at the first valid run for the update
254  b) all other payloads need to start at or after the first valid run for
255  the update.
256  c) The globaltag needs to be gap and overlap free
257  d) All payloads in the staging tag should have as last iov an open iov
258  (i.e. valid to infinity) but this can be disabled.
259 
260  The script will check all the above requirements and will then calculate the
261  necessary operations to
262 
263  1. Add all payloads from the staging tag where a start validity of (0, 0) is
264  replaced by the starting run for which this update should be valid.
265  2. close all iovs for payloads in the running tags just before the
266  corresponding iov of the same payload in the staging tag, so either at the
267  first run for the update to be valid or later
268  3. Optionally, make sure all payloads in the staging tag end in an open iov.
269 
270  Example:
271 
272  running tag contains ::
273 
274  payload1, rev 1, valid from 0,1 to 1,0
275  payload1, rev 2, valid from 1,1 to -1,-1
276  payload2, rev 1, valid from 0,1 to -1,-1
277  payload3, rev 1, valid from 0,1 to 1,0
278  payload4, rev 1, valid from 0,1 to -1,-1
279  payload5, rev 1, valid from 0,1 to -1,-1
280 
281  staging tag contains ::
282 
283  payload1, rev 3, valid from 0,0 to 1,8
284  payload1, rev 4, valid from 1,9 to 1,20
285  payload2, rev 2, valid from 1,5 to 1,20
286  payload3, rev 2, valid from 0,0 to -1,-1
287  payload4, rev 1, valid from 0,0 to 1,20
288 
289  Then running ``b2conditionsdb tag runningupdate running staging --run 1 2 --allow-closed``,
290  the running globaltag after the update will contain ::
291 
292  payload1, rev 1, valid from 0,1 to 1,0
293  payload1, rev 2, valid from 1,1 to 1,1
294  payload1, rev 3, valid from 1,2 to 1,8
295  payload1, rev 4, valid from 1,9 to 1,20
296  payload2, rev 1, valid from 0,1 to 1,4
297  payload2, rev 2, valid from 1,5 to 1,20
298  payload3, rev 1, valid from 0,1 to 1,0
299  payload3, rev 2, valid from 1,2 to -1,-1
300  payload4, rev 1, valid from 0,1 to 1,20
301  payload5, rev 1, valid from 0,1 to -1,-1
302 
303  Note that
304 
305  - the start of payload1 and payload3 in staging has been adjusted
306  - payload2 in the running tag as been closed at 1,4, just before the
307  validity from the staging tag
308  - payload3 was already closed in the running tag so no change is
309  performed. This might result in gaps but is intentional
310  - payload4 was not closed at rim 1,2 because the staging tag had the same
311  revision of the payload so the these were merged to one long validity.
312  - payload5 was not closed as there was no update to it in the staging tag.
313  If we would have run with ``--full-replacement`` it would have been closed.
314  - if we would have chosen ``--run 1 1`` the update would have failed because
315  payload1, rev2 in running starts at 1,1 so we would have a conflict
316  - if we would have chosen ``--run 1 6`` the update would have failed because
317  payload2 in the staging tag starts before this run
318  - if we would have chosen to open the final iovs in staging by using
319  ``--fix-closed``, payload1, rev 4; payload2, rev 2 and payload4 rev 1
320  would be valid until -1,-1 after the running tag. In fact, payload 4
321  would not be changed at all.
322  """
323  if db is None:
324  args.add_argument("running", help="name of the running globaltag")
325  args.add_argument("staging", help="name of the staging globaltag")
326  group = args.add_argument_group("required named arguments")
327  group.add_argument("-r", "--run", required=True, nargs=2, type=int, metavar=("EXP", "RUN"),
328  help="First experiment + run number for which the update should be "
329  "valid. Two numbers separated by space")
330  choice = args.add_mutually_exclusive_group()
331  choice.add_argument("--allow-closed", dest="mode", action="store_const",
332  const=RunningTagUpdateMode.ALLOW_CLOSED,
333  default=RunningTagUpdateMode.STRICT,
334  help="if given allow payloads in the staging tag to not "
335  "be open, i.e. they don't have to be open ended in the "
336  "update. Useful to retire a payload by adding one last update")
337  choice.add_argument("--fix-closed", dest="mode", action="store_const",
338  const=RunningTagUpdateMode.FIX_CLOSED,
339  help="if given automatically open the last iov for each "
340  "payload in staging if it is closed.")
341  choice.add_argument("--simple-mode", dest="mode", action="store_const",
342  const=RunningTagUpdateMode.SIMPLE,
343  help="if given require the staging tag to solely consist "
344  "of fully infinite validities: Only one iov per payload "
345  "with a validity of (0,0,-1,-1)")
346  choice.add_argument("--full-replacement", dest="mode", action="store_const",
347  const=RunningTagUpdateMode.FULL_REPLACEMENT,
348  help="if given perform a full replacement and close all "
349  "open iovs in the running tag not present in the staging tag. "
350  "After such an update exactly the payloads in the staging tag "
351  "will be valid after the given run. This allows for closed iovs "
352  "in the staging tag as with ``--allow-closed``")
353  args.add_argument("--dry-run", default=False, action="store_true",
354  help="Only show the changes, don't try to apply them")
355  return
356 
357  try:
358  updater = RunningTagUpdater(db, args.running, args.staging, args.run, args.mode)
359  operations = updater.calculate_update()
360  except RunningTagUpdaterError as e:
361  B2ERROR(e, **e.extra_vars)
362  return 1
363 
364  # make sure we exit if we have nothing to do
365  if not operations:
366  B2INFO("Nothing to do, please check the globaltags given are correct")
367  return 1
368 
369  # show operations in a table and some summary
370  table = []
371  last_valid = tuple(args.run)
372  summary = {
373  "first valid run": last_valid,
374  "payloads closed": 0,
375  "payloads updated": 0,
376  "payload iovs added": 0,
377  "next possible update": last_valid,
378  }
379 
380  updated_payloads = set()
381  for op, payload in operations:
382  # calculate how many payloads/iovs will be closed/added
383  if op == "CLOSE":
384  summary['payloads closed'] += 1
385  else:
386  updated_payloads.add(payload.name)
387  summary['payload iovs added'] += 1
388  # remember the highest run number of any iov, so first run
389  last_valid = max(payload.iov[:2], last_valid)
390  # and final run if not open
391  if payload.iov[2:] != (-1, -1):
392  last_valid = max(payload.iov[2:], last_valid)
393  # and add to the table of operations to be shown
394  table.append([op, payload.name, payload.revision] + list(payload.iov))
395 
396  # calculate the next fee run
397  summary['next possible update'] = (last_valid[0] + (1 if last_valid[1] < 0 else 0), last_valid[1] + 1)
398  # and the number of distinct payloads
399  summary['payloads updated'] = len(updated_payloads)
400 
401  # prepare some colors for easy distinction of closed payloads
402  support_color = LogPythonInterface.terminal_supports_colors()
403 
404  def color_row(row, _, line):
405  """Color the lines depending on which globaltag the payload comes from"""
406  if not support_color:
407  return line
408  begin = "" if row[0] != "CLOSE" else "\x1b[31m"
409  end = '\x1b[0m'
410  return begin + line + end
411 
412  # and then show the table
413  table.sort(key=lambda x: (x[1], x[3:], x[2], x[0]))
414  table.insert(0, ["Action", "Payload", "Rev", "First Exp", "First Run", "Final Exp", "Final Run"])
415  columns = [6, '*', -6, 6, 6, 6, 6]
416 
417  with Pager(f"Changes to running tag {args.running}:", True):
418  B2INFO(f"Changes to be applied to the running tag {args.running}")
419  pretty_print_table(table, columns, transform=color_row)
420 
421  if args.dry_run:
422  B2INFO("Running in dry mode, not applying any changes.", **summary)
423  return 0
424 
425  B2WARNING("Applying these changes cannot be undone and further updates to "
426  "this run range will **NOT** be possible", **summary)
427  # ask if the user really knows what they're doing
428  answer = input("Are you sure you want to continue? [yes/No]: ")
429  while answer.lower().strip() not in ['yes', 'no', 'n', '']:
430  answer = input("Please enter 'yes' or 'no': ")
431 
432  if answer.lower().strip() != 'yes':
433  B2INFO("Aborted by user ...")
434  return 1
435 
436  # Ok, all set ... apply the update
437  try:
438  updater.apply_update()
439  B2INFO("done")
440  return 0
441  except RunningTagUpdaterError as e:
442  B2ERROR(e, **e.extra_vars)
443  return 1
basf2.utils
Definition: utils.py:1
conditions_db.runningupdate
Definition: runningupdate.py:1
conditions_db.iov
Definition: iov.py:1