Belle II Software  light-2212-foldex
cli_management.py
1 
8 import functools
9 import time
10 from concurrent.futures import ThreadPoolExecutor
11 from collections import defaultdict
12 from basf2 import B2INFO, B2ERROR, B2WARNING, LogPythonInterface # noqa
13 from basf2.utils import pretty_print_table
14 from terminal_utils import Pager
15 from conditions_db.iov import IoVSet, IntervalOfValidity
16 from conditions_db.runningupdate import RunningTagUpdater, RunningTagUpdaterError, RunningTagUpdateMode
17 
18 
19 def get_all_iovsets(existing_payloads, run_range=None):
20  """Given a list of PayloadInformation objects, return a reduced list PayloadInformation
21  objects with the single iovs replaced with IoVSets. Payloads with the same
22  name and revision will be merged.
23 
24  Overlaps will raise an B2ERROR
25  """
26  all_payloads = defaultdict(lambda: IoVSet(allow_overlaps=True))
27  by_name = defaultdict(lambda: IoVSet(allow_overlaps=False))
28  infos = {}
29  for payload in existing_payloads:
30  # we want to make set of iovs for each payload
31  iov = IntervalOfValidity(payload.iov)
32  # possibly we have a run range we want to limit to
33  if run_range is not None:
34  iov &= run_range
35  if not iov:
36  continue
37 
38  # merge the iovs for the same revision
39  all_payloads[payload.name, payload.revision].add(iov)
40  # also keep the PayloadInformation
41  infos[payload.name, payload.revision] = payload
42 
43  # and also check if there are any overlaps with the same payload name
44  try:
45  by_name[payload.name].add(iov)
46  except ValueError as e:
47  B2ERROR(f"Overlap for payload {payload.name} r{payload.revision}: {e}")
48 
49  # so now flatten the thing again and return PayloadInformation objects we slightly modify
50  result = []
51  for (name, revision), iov in all_payloads.items():
52  info = infos[name, revision]
53  info.iov = iov
54  info.iov_id = None
55  result.append(info)
56 
57  return result
58 
59 
60 def create_iov_wrapper(db, globaltag_id, payload):
61  """
62  Wrapper function for adding payloads into a given globaltag.
63  """
64  for iov in payload.iov:
65  if db.create_iov(globaltag_id, payload.payload_id, *iov.tuple) is None:
66  raise RuntimeError(f"Cannot create iov for {payload.name} r{payload.revision}")
67 
68 
69 def command_tag_merge(args, db=None):
70  """
71  Merge a list of globaltags in the order they are given.
72 
73  This command allows to merge a number of globaltags into a single globaltag.
74  Payloads from later globaltags in the list of arguments are used to fill gaps
75  present in earlier globaltags.
76 
77  The result is equivalent to having multiple globaltags setup in the conditions
78  access for basf2 (highest priority goes first).
79 
80  Warning:
81  The order of the globaltags is highest priority first, so payloads from
82  globaltags earlier on the command line will be taken with before globaltags
83  from later tags.
84 
85  This command requires that all globaltags are overlap free.
86 
87  For each globaltag in the list we copy all payloads to the output globaltag
88  if there is no payload of that name valid for the given interval of validity
89  in any previous globaltags in the list.
90 
91  If the payload overlaps partially with a payload from a previous globaltag
92  in the list the interval of validity is shortened (and possibly split) to
93  not overlap but to just fill the gaps.
94 
95  For example:
96 
97  globaltag A contains ::
98 
99  payload1, rev 2, valid from 1,0 to 1,10
100  payload1, rev 3, valid from 1,20 to 1,22
101  payload2, rev 1, valid from 1,0 to 1,-1
102 
103  globaltag B contains ::
104 
105  payload1, rev 1, valid from 1,1 to 1,30
106  payload2, rev 2, valid from 0,1 to 1,20
107 
108  Then running ``b2conditionsdb tag merge -o output A B``, the output globaltag
109  after the merge will contain::
110 
111  payload1, rev 2, valid from 1,0 to 1,10
112  payload1, rev 1, valid from 1,11 to 1,19
113  payload1, rev 3, valid from 1,20 to 1,22
114  payload1, rev 1, valid from 1,23 to 1,30
115  payload2, rev 2, valid from 0,1 to 0,-1
116  payload2, rev 1, valid from 1,0 to 1,-1
117 
118  When finished, this command will print a table of payloads and their
119  validity and from which globaltag they were taken. If ``--dry-run`` is given
120  it will only print the list of payloads.
121 
122  Optionally one can specify ``--run-range`` to limit the run range for which
123  the merging should take place. In the example above, running with
124  ``--run-range 1 0 1 21`` the result would be ::
125 
126  payload1, rev 2, valid from 1,0 to 1,10
127  payload1, rev 1, valid from 1,11 to 1,19
128  payload1, rev 3, valid from 1,20 to 1,21
129  payload2, rev 1, valid from 1,0 to 1,21
130 
131  .. versionadded:: release-05-01-00
132  """
133 
134  if db is None:
135  args.add_argument("globaltag", nargs="+", help="name of the globaltag")
136  group = args.add_argument_group("required named arguments")
137  group.add_argument("-o", "--output", required=True, help="Name of the output globaltag")
138  args.add_argument("--dry-run", help="Don't do anything, just print a table with the results",
139  action="store_true", default=False)
140  args.add_argument("--run-range", nargs=4, default=None, type=int,
141  metavar=("FIRST_EXP", "FIRST_RUN", "FINAL_EXP", "FINAL_RUN"),
142  help="Can be for numbers to limit the run range to put"
143  "in the output globaltag: All iovs will be limited to "
144  "be in this range.")
145  args.add_argument("-j", type=int, default=10, dest="nprocess",
146  help="Number of concurrent threads to use for "
147  "creating payloads into the output globaltag.")
148  return
149 
150  # prepare some colors for easy distinction of source tag
151  support_color = LogPythonInterface.terminal_supports_colors()
152  if support_color:
153  colors = "\x1b[32m \x1b[34m \x1b[35m \x1b[31m".split()
154  colors = {tag: color for tag, color in zip(args.globaltag, colors)}
155 
156  def color_row(row, _, line):
157  """Color the lines depending on which globaltag the payload comes from"""
158  if not support_color:
159  return line
160  begin = colors.get(row[-1], "")
161  end = '\x1b[0m'
162  return begin + line + end
163 
164  with Pager("Result of merging globaltags", True):
165  # make sure output tag exists
166  output_id = db.get_globalTagInfo(args.output)
167  if output_id is None:
168  B2ERROR("Output globaltag doesn't exist. Please create it first with a proper description")
169  return False
170 
171  output_id = output_id["globalTagId"]
172 
173  # check all globaltags exist
174  if any(db.get_globalTagInfo(tag) is None for tag in args.globaltag):
175  return False
176 
177  final = []
178  table = []
179  existing = defaultdict(lambda: IoVSet(allow_overlaps=True))
180  if args.run_range is not None:
181  args.run_range = IntervalOfValidity(args.run_range)
182 
183  # For each globaltag
184  for tag in args.globaltag:
185  # get all the payloads and iovs from the globaltag
186  all_payloads = db.get_all_iovs(tag)
187  # sort all the payloads by revision number (reversed sort: highest revisions first)
188  all_payloads.sort(key=lambda p: p.revision, reverse=True)
189  # and sort again but this time by name: not really necessary,
190  # but it helps printing the log messages ordered by payloads name
191  all_payloads.sort(key=lambda p: p.name, reverse=False)
192  # get all payload information objects with their iovs already merged to iovset instances
193  payloads = get_all_iovsets(all_payloads, args.run_range)
194  for payload in payloads:
195  # make sure it doesn't overlap with any of the previous
196  payload.iov.remove(existing[payload.name])
197  # but if there's something left
198  if payload.iov:
199  # extend the known coverage of this payload
200  existing[payload.name] |= payload.iov
201  # extend this payload to the list to create later
202  final.append(payload)
203  # and add all separate iovs to the table to show the user
204  for iov in payload.iov:
205  table.append([payload.name, payload.revision] + list(iov.tuple) + [tag])
206 
207  # sort the table by payload name and start run ... we want to display it
208  table.sort(key=lambda r: (r[0], r[3:5]))
209 
210  # and fancy print it ...
211  table.insert(0, ["Name", "Rev", "First Exp", "First Run", "Final Exp", "Final Run", "Source"])
212  columns = ["+", -8, 6, 6, 6, 6, max(len(_) for _ in args.globaltag)]
213 
214  B2INFO(f"Result of merging the globaltags {', '.join(args.globaltag)}")
215 
216  pretty_print_table(table, columns, transform=color_row)
217 
218  # Ok, we're still alive, create all the payloads using multiple processes.
219  if not args.dry_run:
220  B2INFO(f'Now copying the {len(final)} payloads into {args.output} to create {len(table)-1} iovs ...')
221  create_iov = functools.partial(create_iov_wrapper, db, output_id)
222  try:
223  with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
224  start = time.monotonic()
225  for payload, _ in enumerate(pool.map(create_iov, final), 1):
226  eta = (time.monotonic() - start) / payload * (len(final) - payload)
227  B2INFO(f"{payload}/{len(final)} payloads copied, ETA: {eta:.1f} seconds")
228  except RuntimeError:
229  B2ERROR("Not all iovs could be created. This could be a server/network problem "
230  "or the destination globaltag was not empty or not writeable. Please make "
231  "sure the target tag is empty and try again")
232  return 1
233 
234  return 0
235 
236 
237 def command_tag_runningupdate(args, db=None):
238  """
239  Update a running globaltag with payloads from a staging tag
240 
241  This command will calculate and apply the necessary updates to a running
242  globaltag with a given staging globaltag
243 
244  Running tags are defined as "immutable for existing data but conditions for
245  newer runs may be added" and the only modification allowed is to add new
246  payloads for new runs or close existing payloads to no longer be valid for
247  new runs.
248 
249  This command takes previously prepared and validated payloads in a staging
250  globaltag and will then calculate which payloads to close and what to add to
251  the running globaltag.
252 
253  For this to work we require
254 
255  1. A running globaltag in the state "RUNNING"
256 
257  2. A (experiment, run) number from which run on the update should be valid.
258  This run number needs to be
259 
260  a) bigger than the start of validity for all iovs in the running tag
261  b) bigger than the end of validity for all closed iovs (i.e. not valid
262  to infinity) in the running tag
263 
264  3. A staging globaltag with the new payloads in state "VALIDATED"
265 
266  a) payloads in the staging tag starting at (0,0) will be interpreted as
267  starting at the first valid run for the update
268  b) all other payloads need to start at or after the first valid run for
269  the update.
270  c) The globaltag needs to be gap and overlap free
271  d) All payloads in the staging tag should have as last iov an open iov
272  (i.e. valid to infinity) but this can be disabled.
273 
274  The script will check all the above requirements and will then calculate the
275  necessary operations to
276 
277  1. Add all payloads from the staging tag where a start validity of (0, 0) is
278  replaced by the starting run for which this update should be valid.
279  2. close all iovs for payloads in the running tags just before the
280  corresponding iov of the same payload in the staging tag, so either at the
281  first run for the update to be valid or later
282  3. Optionally, make sure all payloads in the staging tag end in an open iov.
283 
284  Example:
285 
286  running tag contains ::
287 
288  payload1, rev 1, valid from 0,1 to 1,0
289  payload1, rev 2, valid from 1,1 to -1,-1
290  payload2, rev 1, valid from 0,1 to -1,-1
291  payload3, rev 1, valid from 0,1 to 1,0
292  payload4, rev 1, valid from 0,1 to -1,-1
293  payload5, rev 1, valid from 0,1 to -1,-1
294 
295  staging tag contains ::
296 
297  payload1, rev 3, valid from 0,0 to 1,8
298  payload1, rev 4, valid from 1,9 to 1,20
299  payload2, rev 2, valid from 1,5 to 1,20
300  payload3, rev 2, valid from 0,0 to -1,-1
301  payload4, rev 1, valid from 0,0 to 1,20
302 
303  Then running ``b2conditionsdb tag runningupdate running staging --run 1 2 --allow-closed``,
304  the running globaltag after the update will contain ::
305 
306  payload1, rev 1, valid from 0,1 to 1,0
307  payload1, rev 2, valid from 1,1 to 1,1
308  payload1, rev 3, valid from 1,2 to 1,8
309  payload1, rev 4, valid from 1,9 to 1,20
310  payload2, rev 1, valid from 0,1 to 1,4
311  payload2, rev 2, valid from 1,5 to 1,20
312  payload3, rev 1, valid from 0,1 to 1,0
313  payload3, rev 2, valid from 1,2 to -1,-1
314  payload4, rev 1, valid from 0,1 to 1,20
315  payload5, rev 1, valid from 0,1 to -1,-1
316 
317  Note that
318 
319  - the start of payload1 and payload3 in staging has been adjusted
320  - payload2 in the running tag as been closed at 1,4, just before the
321  validity from the staging tag
322  - payload3 was already closed in the running tag so no change is
323  performed. This might result in gaps but is intentional
324  - payload4 was not closed at rim 1,2 because the staging tag had the same
325  revision of the payload so the these were merged to one long validity.
326  - payload5 was not closed as there was no update to it in the staging tag.
327  If we would have run with ``--full-replacement`` it would have been closed.
328  - if we would have chosen ``--run 1 1`` the update would have failed because
329  payload1, rev2 in running starts at 1,1 so we would have a conflict
330  - if we would have chosen ``--run 1 6`` the update would have failed because
331  payload2 in the staging tag starts before this run
332  - if we would have chosen to open the final iovs in staging by using
333  ``--fix-closed``, payload1, rev 4; payload2, rev 2 and payload4 rev 1
334  would be valid until -1,-1 after the running tag. In fact, payload 4
335  would not be changed at all.
336  """
337  if db is None:
338  args.add_argument("running", help="name of the running globaltag")
339  args.add_argument("staging", help="name of the staging globaltag")
340  group = args.add_argument_group("required named arguments")
341  group.add_argument("-r", "--run", required=True, nargs=2, type=int, metavar=("EXP", "RUN"),
342  help="First experiment + run number for which the update should be "
343  "valid. Two numbers separated by space")
344  choice = args.add_mutually_exclusive_group()
345  choice.add_argument("--allow-closed", dest="mode", action="store_const",
346  const=RunningTagUpdateMode.ALLOW_CLOSED,
347  default=RunningTagUpdateMode.STRICT,
348  help="if given allow payloads in the staging tag to not "
349  "be open, i.e. they don't have to be open ended in the "
350  "update. Useful to retire a payload by adding one last update")
351  choice.add_argument("--fix-closed", dest="mode", action="store_const",
352  const=RunningTagUpdateMode.FIX_CLOSED,
353  help="if given automatically open the last iov for each "
354  "payload in staging if it is closed.")
355  choice.add_argument("--simple-mode", dest="mode", action="store_const",
356  const=RunningTagUpdateMode.SIMPLE,
357  help="if given require the staging tag to solely consist "
358  "of fully infinite validities: Only one iov per payload "
359  "with a validity of (0,0,-1,-1)")
360  choice.add_argument("--full-replacement", dest="mode", action="store_const",
361  const=RunningTagUpdateMode.FULL_REPLACEMENT,
362  help="if given perform a full replacement and close all "
363  "open iovs in the running tag not present in the staging tag. "
364  "After such an update exactly the payloads in the staging tag "
365  "will be valid after the given run. This allows for closed iovs "
366  "in the staging tag as with ``--allow-closed``")
367  args.add_argument("--dry-run", default=False, action="store_true",
368  help="Only show the changes, don't try to apply them")
369  return
370 
371  try:
372  updater = RunningTagUpdater(db, args.running, args.staging, args.run, args.mode, args.dry_run)
373  operations = updater.calculate_update()
374  except RunningTagUpdaterError as e:
375  B2ERROR(e, **e.extra_vars)
376  return 1
377 
378  # make sure we exit if we have nothing to do
379  if not operations:
380  B2INFO("Nothing to do, please check the globaltags given are correct")
381  return 1
382 
383  # show operations in a table and some summary
384  table = []
385  last_valid = tuple(args.run)
386  summary = {
387  "first valid run": last_valid,
388  "payloads closed": 0,
389  "payloads updated": 0,
390  "payload iovs added": 0,
391  "next possible update": last_valid,
392  }
393 
394  updated_payloads = set()
395  for op, payload in operations:
396  # calculate how many payloads/iovs will be closed/added
397  if op == "CLOSE":
398  summary['payloads closed'] += 1
399  else:
400  updated_payloads.add(payload.name)
401  summary['payload iovs added'] += 1
402  # remember the highest run number of any iov, so first run
403  last_valid = max(payload.iov[:2], last_valid)
404  # and final run if not open
405  if payload.iov[2:] != (-1, -1):
406  last_valid = max(payload.iov[2:], last_valid)
407  # and add to the table of operations to be shown
408  table.append([op, payload.name, payload.revision] + list(payload.iov))
409 
410  # calculate the next fee run
411  summary['next possible update'] = (last_valid[0] + (1 if last_valid[1] < 0 else 0), last_valid[1] + 1)
412  # and the number of distinct payloads
413  summary['payloads updated'] = len(updated_payloads)
414 
415  # prepare some colors for easy distinction of closed payloads
416  support_color = LogPythonInterface.terminal_supports_colors()
417 
418  def color_row(row, _, line):
419  """Color the lines depending on which globaltag the payload comes from"""
420  if not support_color:
421  return line
422  begin = "" if row[0] != "CLOSE" else "\x1b[31m"
423  end = '\x1b[0m'
424  return begin + line + end
425 
426  # and then show the table
427  table.sort(key=lambda x: (x[1], x[3:], x[2], x[0]))
428  table.insert(0, ["Action", "Payload", "Rev", "First Exp", "First Run", "Final Exp", "Final Run"])
429  columns = [6, '*', -6, 6, 6, 6, 6]
430 
431  with Pager(f"Changes to running tag {args.running}:", True):
432  B2INFO(f"Changes to be applied to the running tag {args.running}")
433  pretty_print_table(table, columns, transform=color_row)
434 
435  if args.dry_run:
436  B2INFO("Running in dry mode, not applying any changes.", **summary)
437  return 0
438 
439  B2WARNING("Applying these changes cannot be undone and further updates to "
440  "this run range will **NOT** be possible", **summary)
441  # ask if the user really knows what they're doing
442  answer = input("Are you sure you want to continue? [yes/No]: ")
443  while answer.lower().strip() not in ['yes', 'no', 'n', '']:
444  answer = input("Please enter 'yes' or 'no': ")
445 
446  if answer.lower().strip() != 'yes':
447  B2INFO("Aborted by user ...")
448  return 1
449 
450  # Ok, all set ... apply the update
451  try:
452  updater.apply_update()
453  B2INFO("done")
454  return 0
455  except RunningTagUpdaterError as e:
456  B2ERROR(e, **e.extra_vars)
457  return 1