Belle II Software  release-06-02-00
cli_management.py
1 
8 import functools
9 import time
10 from concurrent.futures import ThreadPoolExecutor
11 from collections import defaultdict
12 from basf2 import B2INFO, B2ERROR, B2WARNING, LogPythonInterface # noqa
13 from basf2.utils import pretty_print_table
14 from terminal_utils import Pager
15 from conditions_db.iov import IoVSet, IntervalOfValidity
16 from conditions_db.runningupdate import RunningTagUpdater, RunningTagUpdaterError, RunningTagUpdateMode
17 
18 
19 def get_all_iovsets(existing_payloads, run_range=None):
20  """Given a list of PayloadInformation objects, return a reduced list PayloadInformation
21  objects with the single iovs replaced with IoVSets. Payloads with the same
22  name and revision will be merged.
23 
24  Overlaps will raise an B2ERROR
25  """
26  all_payloads = defaultdict(lambda: IoVSet(allow_overlaps=True))
27  by_name = defaultdict(lambda: IoVSet(allow_overlaps=False))
28  infos = {}
29  for payload in existing_payloads:
30  # we want to make set of iovs for each payload
31  iov = IntervalOfValidity(payload.iov)
32  # possibly we have a run range we want to limit to
33  if run_range is not None:
34  iov &= run_range
35  if not iov:
36  continue
37 
38  # merge the iovs for the same revision
39  all_payloads[payload.name, payload.revision].add(iov)
40  # also keep the PayloadInformation
41  infos[payload.name, payload.revision] = payload
42 
43  # and also check if there are any overlaps with the same payload name
44  try:
45  by_name[payload.name].add(iov)
46  except ValueError as e:
47  B2ERROR(f"Overlap for payload {payload.name}: {e}")
48 
49  # so now flatten the thing again and return PayloadInformation objects we slightly modify
50  result = []
51  for (name, revision), iov in all_payloads.items():
52  info = infos[name, revision]
53  info.iov = iov
54  info.iov_id = None
55  result.append(info)
56 
57  return result
58 
59 
60 def create_iov_wrapper(db, globaltag_id, payload):
61  """
62  Wrapper function for adding payloads into a given globaltag.
63  """
64  for iov in payload.iov:
65  if db.create_iov(globaltag_id, payload.payload_id, *iov.tuple) is None:
66  raise RuntimeError(f"Cannot create iov for {payload.name} r{payload.revision}")
67 
68 
69 def command_tag_merge(args, db=None):
70  """
71  Merge a list of globaltags in the order they are given.
72 
73  This command allows to merge a number of globaltags into a single globaltag.
74  Payloads from later globaltags in the list of arguments are used to fill gaps
75  present in earlier globaltags.
76 
77  The result is equivalent to having multiple globaltags setup in the conditions
78  access for basf2 (highest priority goes first).
79 
80  Warning:
81  The order of the globaltags is highest priority first, so payloads from
82  globaltags earlier on the command line will be taken with before globaltags
83  from later tags.
84 
85  This command requires that all globaltags are overlap free.
86 
87  For each globaltag in the list we copy all payloads to the output globaltag
88  if there is no payload of that name valid for the given interval of validity
89  in any previous globaltags in the list.
90 
91  If the payload overlaps partially with a payload from a previous globaltag
92  in the list the interval of validity is shortened (and possibly split) to
93  not overlap but to just fill the gaps.
94 
95  For example:
96 
97  globaltag A contains ::
98 
99  payload1, rev 2, valid from 1,0 to 1,10
100  payload1, rev 3, valid from 1,20 to 1,22
101  payload2, rev 1, valid from 1,0 to 1,-1
102 
103  globaltag B contains ::
104 
105  payload1, rev 1, valid from 1,1 to 1,30
106  payload2, rev 2, valid from 0,1 to 1,20
107 
108  Then running ``b2conditionsdb tag merge -o output A B``, the output globaltag
109  after the merge will contain::
110 
111  payload1, rev 2, valid from 1,0 to 1,10
112  payload1, rev 1, valid from 1,11 to 1,19
113  payload1, rev 3, valid from 1,20 to 1,22
114  payload1, rev 1, valid from 1,23 to 1,30
115  payload2, rev 2, valid from 0,1 to 0,-1
116  payload2, rev 1, valid from 1,0 to 1,-1
117 
118  When finished, this command will print a table of payloads and their
119  validity and from which globaltag they were taken. If ``--dry-run`` is given
120  it will only print the list of payloads.
121 
122  Optionally one can specify ``--run-range`` to limit the run range for which
123  the merging should take place. In the example above, running with
124  ``--run-range 1 0 1 21`` the result would be ::
125 
126  payload1, rev 2, valid from 1,0 to 1,10
127  payload1, rev 1, valid from 1,11 to 1,19
128  payload1, rev 3, valid from 1,20 to 1,21
129  payload2, rev 1, valid from 1,0 to 1,21
130 
131  .. versionadded:: release-05-01-00
132  """
133 
134  if db is None:
135  args.add_argument("globaltag", nargs="+", help="name of the globaltag")
136  group = args.add_argument_group("required named arguments")
137  group.add_argument("-o", "--output", required=True, help="Name of the output globaltag")
138  args.add_argument("--dry-run", help="Don't do anything, just print a table with the results",
139  action="store_true", default=False)
140  args.add_argument("--run-range", nargs=4, default=None, type=int,
141  metavar=("FIRST_EXP", "FIRST_RUN", "FINAL_EXP", "FINAL_RUN"),
142  help="Can be for numbers to limit the run range to put"
143  "in the output globaltag: All iovs will be limited to "
144  "be in this range.")
145  args.add_argument("-j", type=int, default=10, dest="nprocess",
146  help="Number of concurrent threads to use for "
147  "creating payloads into the output globaltag.")
148  return
149 
150  # prepare some colors for easy distinction of source tag
151  support_color = LogPythonInterface.terminal_supports_colors()
152  if support_color:
153  colors = "\x1b[32m \x1b[34m \x1b[35m \x1b[31m".split()
154  colors = {tag: color for tag, color in zip(args.globaltag, colors)}
155 
156  def color_row(row, _, line):
157  """Color the lines depending on which globaltag the payload comes from"""
158  if not support_color:
159  return line
160  begin = colors.get(row[-1], "")
161  end = '\x1b[0m'
162  return begin + line + end
163 
164  with Pager("Result of merging globaltags", True):
165  # make sure output tag exists
166  output_id = db.get_globalTagInfo(args.output)
167  if output_id is None:
168  B2ERROR("Output globaltag doesn't exist. Please create it first with a proper description")
169  return False
170 
171  output_id = output_id["globalTagId"]
172 
173  # check all globaltags exist
174  if any(db.get_globalTagInfo(tag) is None for tag in args.globaltag):
175  return False
176 
177  final = []
178  table = []
179  existing = defaultdict(lambda: IoVSet(allow_overlaps=True))
180  if args.run_range is not None:
181  args.run_range = IntervalOfValidity(args.run_range)
182 
183  # For each globaltag
184  for tag in args.globaltag:
185  # get all payload information objects with their iovs already merged to iovset instances
186  payloads = get_all_iovsets(db.get_all_iovs(tag), args.run_range)
187  for payload in payloads:
188  # make sure it doesn't overlap with any of the previous
189  payload.iov.remove(existing[payload.name])
190  # but if there's something left
191  if payload.iov:
192  # extend the known coverage of this payload
193  existing[payload.name] |= payload.iov
194  # extend this payload to the list to create later
195  final.append(payload)
196  # and add all separate iovs to the table to show the user
197  for iov in payload.iov:
198  table.append([payload.name, payload.revision] + list(iov.tuple) + [tag])
199 
200  # sort the table by payload name and start run ... we want to display it
201  table.sort(key=lambda r: (r[0], r[3:5]))
202 
203  # and fancy print it ...
204  table.insert(0, ["Name", "Rev", "First Exp", "First Run", "Final Exp", "Final Run", "Source"])
205  columns = ["+", -8, 6, 6, 6, 6, max(len(_) for _ in args.globaltag)]
206 
207  B2INFO(f"Result of merging the globaltags {', '.join(args.globaltag)}")
208 
209  pretty_print_table(table, columns, transform=color_row)
210 
211  # Ok, we're still alive, create all the payloads using multiple processes.
212  if not args.dry_run:
213  B2INFO(f'Now copying the {len(final)} payloads into {args.output} to create {len(table)-1} iovs ...')
214  create_iov = functools.partial(create_iov_wrapper, db, output_id)
215  try:
216  with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
217  start = time.monotonic()
218  for payload, _ in enumerate(pool.map(create_iov, final), 1):
219  eta = (time.monotonic() - start) / payload * (len(final) - payload)
220  B2INFO(f"{payload}/{len(final)} payloads copied, ETA: {eta:.1f} seconds")
221  except RuntimeError:
222  B2ERROR("Not all iovs could be created. This could be a server/network problem "
223  "or the destination globaltag was not empty or not writeable. Please make "
224  "sure the target tag is empty and try again")
225  return 1
226 
227  return 0
228 
229 
230 def command_tag_runningupdate(args, db=None):
231  """
232  Update a running globaltag with payloads from a staging tag
233 
234  This command will calculate and apply the necessary updates to a running
235  globaltag with a given staging globaltag
236 
237  Running tags are defined as "immutable for existing data but conditions for
238  newer runs may be added" and the only modification allowed is to add new
239  payloads for new runs or close existing payloads to no longer be valid for
240  new runs.
241 
242  This command takes previously prepared and validated payloads in a staging
243  globaltag and will then calculate which payloads to close and what to add to
244  the running globaltag.
245 
246  For this to work we require
247 
248  1. A running globaltag in the state "RUNNING"
249 
250  2. A (experiment, run) number from which run on the update should be valid.
251  This run number needs to be
252 
253  a) bigger than the start of validity for all iovs in the running tag
254  b) bigger than the end of validity for all closed iovs (i.e. not valid
255  to infinity) in the running tag
256 
257  3. A staging globaltag with the new payloads in state "VALIDATED"
258 
259  a) payloads in the staging tag starting at (0,0) will be interpreted as
260  starting at the first valid run for the update
261  b) all other payloads need to start at or after the first valid run for
262  the update.
263  c) The globaltag needs to be gap and overlap free
264  d) All payloads in the staging tag should have as last iov an open iov
265  (i.e. valid to infinity) but this can be disabled.
266 
267  The script will check all the above requirements and will then calculate the
268  necessary operations to
269 
270  1. Add all payloads from the staging tag where a start validity of (0, 0) is
271  replaced by the starting run for which this update should be valid.
272  2. close all iovs for payloads in the running tags just before the
273  corresponding iov of the same payload in the staging tag, so either at the
274  first run for the update to be valid or later
275  3. Optionally, make sure all payloads in the staging tag end in an open iov.
276 
277  Example:
278 
279  running tag contains ::
280 
281  payload1, rev 1, valid from 0,1 to 1,0
282  payload1, rev 2, valid from 1,1 to -1,-1
283  payload2, rev 1, valid from 0,1 to -1,-1
284  payload3, rev 1, valid from 0,1 to 1,0
285  payload4, rev 1, valid from 0,1 to -1,-1
286  payload5, rev 1, valid from 0,1 to -1,-1
287 
288  staging tag contains ::
289 
290  payload1, rev 3, valid from 0,0 to 1,8
291  payload1, rev 4, valid from 1,9 to 1,20
292  payload2, rev 2, valid from 1,5 to 1,20
293  payload3, rev 2, valid from 0,0 to -1,-1
294  payload4, rev 1, valid from 0,0 to 1,20
295 
296  Then running ``b2conditionsdb tag runningupdate running staging --run 1 2 --allow-closed``,
297  the running globaltag after the update will contain ::
298 
299  payload1, rev 1, valid from 0,1 to 1,0
300  payload1, rev 2, valid from 1,1 to 1,1
301  payload1, rev 3, valid from 1,2 to 1,8
302  payload1, rev 4, valid from 1,9 to 1,20
303  payload2, rev 1, valid from 0,1 to 1,4
304  payload2, rev 2, valid from 1,5 to 1,20
305  payload3, rev 1, valid from 0,1 to 1,0
306  payload3, rev 2, valid from 1,2 to -1,-1
307  payload4, rev 1, valid from 0,1 to 1,20
308  payload5, rev 1, valid from 0,1 to -1,-1
309 
310  Note that
311 
312  - the start of payload1 and payload3 in staging has been adjusted
313  - payload2 in the running tag as been closed at 1,4, just before the
314  validity from the staging tag
315  - payload3 was already closed in the running tag so no change is
316  performed. This might result in gaps but is intentional
317  - payload4 was not closed at rim 1,2 because the staging tag had the same
318  revision of the payload so the these were merged to one long validity.
319  - payload5 was not closed as there was no update to it in the staging tag.
320  If we would have run with ``--full-replacement`` it would have been closed.
321  - if we would have chosen ``--run 1 1`` the update would have failed because
322  payload1, rev2 in running starts at 1,1 so we would have a conflict
323  - if we would have chosen ``--run 1 6`` the update would have failed because
324  payload2 in the staging tag starts before this run
325  - if we would have chosen to open the final iovs in staging by using
326  ``--fix-closed``, payload1, rev 4; payload2, rev 2 and payload4 rev 1
327  would be valid until -1,-1 after the running tag. In fact, payload 4
328  would not be changed at all.
329  """
330  if db is None:
331  args.add_argument("running", help="name of the running globaltag")
332  args.add_argument("staging", help="name of the staging globaltag")
333  group = args.add_argument_group("required named arguments")
334  group.add_argument("-r", "--run", required=True, nargs=2, type=int, metavar=("EXP", "RUN"),
335  help="First experiment + run number for which the update should be "
336  "valid. Two numbers separated by space")
337  choice = args.add_mutually_exclusive_group()
338  choice.add_argument("--allow-closed", dest="mode", action="store_const",
339  const=RunningTagUpdateMode.ALLOW_CLOSED,
340  default=RunningTagUpdateMode.STRICT,
341  help="if given allow payloads in the staging tag to not "
342  "be open, i.e. they don't have to be open ended in the "
343  "update. Useful to retire a payload by adding one last update")
344  choice.add_argument("--fix-closed", dest="mode", action="store_const",
345  const=RunningTagUpdateMode.FIX_CLOSED,
346  help="if given automatically open the last iov for each "
347  "payload in staging if it is closed.")
348  choice.add_argument("--simple-mode", dest="mode", action="store_const",
349  const=RunningTagUpdateMode.SIMPLE,
350  help="if given require the staging tag to solely consist "
351  "of fully infinite validities: Only one iov per payload "
352  "with a validity of (0,0,-1,-1)")
353  choice.add_argument("--full-replacement", dest="mode", action="store_const",
354  const=RunningTagUpdateMode.FULL_REPLACEMENT,
355  help="if given perform a full replacement and close all "
356  "open iovs in the running tag not present in the staging tag. "
357  "After such an update exactly the payloads in the staging tag "
358  "will be valid after the given run. This allows for closed iovs "
359  "in the staging tag as with ``--allow-closed``")
360  args.add_argument("--dry-run", default=False, action="store_true",
361  help="Only show the changes, don't try to apply them")
362  return
363 
364  try:
365  updater = RunningTagUpdater(db, args.running, args.staging, args.run, args.mode, args.dry_run)
366  operations = updater.calculate_update()
367  except RunningTagUpdaterError as e:
368  B2ERROR(e, **e.extra_vars)
369  return 1
370 
371  # make sure we exit if we have nothing to do
372  if not operations:
373  B2INFO("Nothing to do, please check the globaltags given are correct")
374  return 1
375 
376  # show operations in a table and some summary
377  table = []
378  last_valid = tuple(args.run)
379  summary = {
380  "first valid run": last_valid,
381  "payloads closed": 0,
382  "payloads updated": 0,
383  "payload iovs added": 0,
384  "next possible update": last_valid,
385  }
386 
387  updated_payloads = set()
388  for op, payload in operations:
389  # calculate how many payloads/iovs will be closed/added
390  if op == "CLOSE":
391  summary['payloads closed'] += 1
392  else:
393  updated_payloads.add(payload.name)
394  summary['payload iovs added'] += 1
395  # remember the highest run number of any iov, so first run
396  last_valid = max(payload.iov[:2], last_valid)
397  # and final run if not open
398  if payload.iov[2:] != (-1, -1):
399  last_valid = max(payload.iov[2:], last_valid)
400  # and add to the table of operations to be shown
401  table.append([op, payload.name, payload.revision] + list(payload.iov))
402 
403  # calculate the next fee run
404  summary['next possible update'] = (last_valid[0] + (1 if last_valid[1] < 0 else 0), last_valid[1] + 1)
405  # and the number of distinct payloads
406  summary['payloads updated'] = len(updated_payloads)
407 
408  # prepare some colors for easy distinction of closed payloads
409  support_color = LogPythonInterface.terminal_supports_colors()
410 
411  def color_row(row, _, line):
412  """Color the lines depending on which globaltag the payload comes from"""
413  if not support_color:
414  return line
415  begin = "" if row[0] != "CLOSE" else "\x1b[31m"
416  end = '\x1b[0m'
417  return begin + line + end
418 
419  # and then show the table
420  table.sort(key=lambda x: (x[1], x[3:], x[2], x[0]))
421  table.insert(0, ["Action", "Payload", "Rev", "First Exp", "First Run", "Final Exp", "Final Run"])
422  columns = [6, '*', -6, 6, 6, 6, 6]
423 
424  with Pager(f"Changes to running tag {args.running}:", True):
425  B2INFO(f"Changes to be applied to the running tag {args.running}")
426  pretty_print_table(table, columns, transform=color_row)
427 
428  if args.dry_run:
429  B2INFO("Running in dry mode, not applying any changes.", **summary)
430  return 0
431 
432  B2WARNING("Applying these changes cannot be undone and further updates to "
433  "this run range will **NOT** be possible", **summary)
434  # ask if the user really knows what they're doing
435  answer = input("Are you sure you want to continue? [yes/No]: ")
436  while answer.lower().strip() not in ['yes', 'no', 'n', '']:
437  answer = input("Please enter 'yes' or 'no': ")
438 
439  if answer.lower().strip() != 'yes':
440  B2INFO("Aborted by user ...")
441  return 1
442 
443  # Ok, all set ... apply the update
444  try:
445  updater.apply_update()
446  B2INFO("done")
447  return 0
448  except RunningTagUpdaterError as e:
449  B2ERROR(e, **e.extra_vars)
450  return 1