Belle II Software  release-08-01-10
calculation.py
1 #!/usr/bin/env python3
2 
3 
10 
11 import time
12 
13 from hep_ipython_tools import viewer
14 from hep_ipython_tools.calculation_process import CalculationProcess
15 
16 import multiprocessing
17 
18 
20 
21  """
22  Create a Calculation from the given Process that handles
23  the status of the process and the actions like start, stop or wait_for_end
24  Do not create instances of this class by yourself but rather use the IPythonHandler for this.
25  """
26 
27  def __init__(self, process_list=None):
28  """ Init with an empty list of processes """
29 
30  if process_list:
31  self.process_listprocess_list = process_list
32  else:
33  self.process_listprocess_list = []
34 
35 
36  self._calculation_process_type_calculation_process_type = CalculationProcess
37 
38  def __iter__(self):
39  """
40  Make the class iterable over all single processes
41  """
42  for process in self.process_listprocess_list:
43  yield Calculation([process])
44 
45  def __len__(self):
46  """ Return the number of handled processes """
47  return len(self.process_listprocess_list)
48 
49  def stop(self, index=None):
50  """
51  Kill the processes. Please keep in mind that killing a process is normaly accompanied with data loss.
52  """
53  def f(process):
54  if self.is_runningis_running(process):
55  process.terminate()
56 
57  self.map_on_processesmap_on_processes(f, index)
58 
59  def start_batched_and_wait_for_end(self, max_processes=None):
60  """
61  Start part of the processes and wait for all to finish. If max_processes is None,
62  only n processes will be started where n is the number of cores on the current machine.
63  As soon as one process is finished, a waiting one will be started in order to fill all cores
64  of the machine.
65 
66  Parameters:
67  max_processes: The number of processes which can be run at the same time
68  """
69  if not max_processes:
70  max_processes = multiprocessing.cpu_count()
71 
72  self.ensure_runningensure_running(max_processes)
73  self.wait_for_endwait_for_end(max_processes=max_processes)
74 
75  def start(self, index=None):
76  """
77  Start the processes in the background.
78  Raises an error of the process has been started already.
79  You can not restart a process. If you want to do so, create another Calculation instance.
80  """
81  def f(process):
82  if not process.already_run:
83  process.start()
84  process.already_run = True
85  else:
86  raise AssertionError("Calculation can not be started twice.")
87 
88  self.map_on_processesmap_on_processes(f, index)
89 
90  def ensure_running(self, max_processes):
91  """
92  Ensure that the max_processes number of processes is running and will start
93  processes which are waiting and have not been started yet.
94  """
95 
96  running_processes = [p for p in self.process_listprocess_list if self.is_runningis_running(p)]
97 
98  processes_to_start = max(0, max_processes - len(running_processes))
99 
100  for process in self.process_listprocess_list:
101  # start processes as long as processes_to_start is larger than zero
102  if not process.already_run and processes_to_start > 0:
103  process.start()
104  process.already_run = True
105  processes_to_start = processes_to_start - 1
106 
107  def wait_for_end(self, display_bar=True, send_notification=False, max_processes=None):
108  """
109  Send the calculation into the foreground by halting the notebook as
110  long as the process is running. Shows a progress bar with the number
111  of processed events. Please keep in mind that you can not execute
112  cells in the notebook when having called wait_for_end (but before -
113  although a calculation is running.)
114 
115 
116  Parameters:
117  display_bar: If true, the display bar is used to show in the notebook
118  that the computation is complete.
119  send_notification: If true, the notify2 library will be used to
120  notify the user if the computation is complete. This will only
121  work if the jupyter notebook is hosted on the local desktop
122  machine.
123  max_processes: The maximum number of processes which will be run on
124  the machine. This has no effect when start() has been called
125  before. This parameter can not be used directly, but
126  start_batched_and_wait_for_end() should be used.
127  """
128 
129  if display_bar:
130  # Initialize all process bars
131  process_bars = {process: viewer.ProgressBarViewer()
132  for process in self.process_listprocess_list if process.is_valid}
133 
134  started_processes = [p for p in self.process_listprocess_list if p.is_valid and p.already_run]
135  running_processes = started_processes
136  # Update all process bars as long as minimum one process is running
137  while len(running_processes) > 0:
138 
139  if max_processes:
140  self.ensure_runningensure_running(max_processes)
141 
142  started_processes = [p for p in self.process_listprocess_list if p.is_valid and p.already_run]
143  running_processes = [p for p in started_processes if self.is_runningis_running(p)]
144  ended_processes = [p for p in started_processes if not self.is_runningis_running(p)]
145 
146  for process in ended_processes:
147  if display_bar:
148  self.show_end_resultshow_end_result(process, process_bars)
149  # Only update the process bar once
150  if process in process_bars:
151  del process_bars[process]
152 
153  for process in running_processes:
154  # Check if the process is valid
155  if not process.is_valid:
156  if display_bar:
157  self.show_end_resultshow_end_result(process, process_bars)
158 
159  # Check if the process is still running. If not set the end result correctly.
160  elif not self.is_runningis_running(process):
161  if display_bar:
162  self.show_end_resultshow_end_result(process, process_bars)
163 
164  # Check if there are news from the process python module (a new percentage)
165  elif process.progress_queue_local.poll():
166  result = process.progress_queue_local.recv()
167  if result != "end" and display_bar:
168  process_bar = process_bars[process]
169  process_bar.update(result)
170 
171  else:
172  process.result_queue.fill_results()
173  process.join(timeout=0.01)
174 
175  time.sleep(0.01)
176 
177  if display_bar:
178  for process in self.process_listprocess_list:
179  self.show_end_resultshow_end_result(process, process_bars)
180 
181  # send notification if requested and notify2 library is available
182  if send_notification:
183  try:
184  import notify2
185 
186  notify2.init("basf2")
187  n = notify2.Notification("basf2", # head line
188  "Calculation finished", # Description text
189  "notification-message-im" # Icon name
190  )
191  n.show()
192  except ImportError:
193  # re-throw with more useful message
194  raise ImportError("notify2 library must be installed to show Desktop notifications.")
195 
196  def show_end_result(self, process, process_bars):
197  """
198  Shows the end result (finished or failed) for all processes in the process_bars list
199  """
200  if process in process_bars:
201  process_bar = process_bars[process]
202  if self.has_failedhas_failed(process):
203  process_bar.update("failed!")
204  else:
205  process_bar.update("finished")
206 
207  def map_on_processes(self, map_function, index):
208  """
209  Calculate a function on all processes and collect the results if index is None.
210  Else calculate the function only one the given process or the process number.
211  """
212  if len(self.process_listprocess_list) == 1:
213  return map_function(self.process_listprocess_list[0])
214  else:
215  if index is None:
216  return list(map(map_function, self.process_listprocess_list))
217  else:
218  if isinstance(index, int):
219  return map_function(self.process_listprocess_list[index])
220  else:
221  return map_function(index)
222 
223  def is_running(self, index=None):
224  """
225  Test if the process is still running
226  """
227  return self.map_on_processesmap_on_processes(lambda process: process.is_alive(), index)
228 
229  def is_finished(self, index=None):
230  """
231  Test if the process has finished
232  """
233  return self.map_on_processesmap_on_processes(lambda process: process.already_run and not self.is_runningis_running(process), index)
234 
235  def has_failed(self, index=None):
236  """
237  Test if the process has failed.
238  """
239  def f(process):
240  if self.is_finishedis_finished(process):
241  return process.exitcode != 0
242  else:
243  raise AssertionError("Calculation has not finished.")
244 
245  return self.map_on_processesmap_on_processes(f, index)
246 
247  def get(self, name, index=None):
248  """
249  Return the saved queue item with the given name
250  """
251  def f(process, name):
252  try:
253  return process.get(name)
254  except KeyError:
255  return None
256  return self.map_on_processesmap_on_processes(lambda process: f(process, name), index)
257 
258  def get_keys(self, index=None):
259  """
260  Return the names of the items that were saved in the queue
261  """
262  return self.map_on_processesmap_on_processes(lambda process: process.get_keys(), index)
263 
264  def get_log(self, index=None):
265  """
266  Return the log of the process if finished
267  """
268  def f(process):
269  if self.is_runningis_running(process) or self.is_finishedis_finished(process):
270  return process.get_log()
271  else:
272  raise AssertionError("Calculation has not been started.")
273 
274  return self.map_on_processesmap_on_processes(f, index)
275 
276  def get_status(self, index=None):
277  """
278  Return a string describing the current status if the calculation
279  """
280  def f(process):
281  if not process.already_run:
282  return "not started"
283  elif self.is_runningis_running(process):
284  return "running"
285  elif self.has_failedhas_failed(process):
286  return "failed"
287  elif self.is_finishedis_finished(process):
288  return "finished"
289 
290  return self.map_on_processesmap_on_processes(f, index)
291 
292  def get_parameters(self, index=None):
293  """
294  Return the parameters used to start this calculation
295  """
296  return self.map_on_processesmap_on_processes(lambda process: process.parameters, index)
297 
298  def create_widgets_for_all_processes(self, widget_function, index=None):
299  """
300  Create a overview widget for all processes or only one for the given process.
301  """
302 
303  if len(self.process_listprocess_list) == 1:
304  widget = widget_function(self.process_listprocess_list[0])
305  else:
306  if index is None:
307  widget = viewer.ProcessViewer(list(map(widget_function, self.process_listprocess_list)))
308  else:
309  if isinstance(index, int):
310  widget = widget_function(self.process_listprocess_list[index])
311  else:
312  widget = widget_function(index)
313 
314  widget.show()
315 
316  def show_log(self, index=None):
317  """
318  Show the log of the underlying process(es).
319  """
320 
321  def f(process):
322  if self.is_runningis_running(process) or self.is_finishedis_finished(process):
323  return viewer.LogViewer(process.get_log())
324  else:
325  raise AssertionError("Calculation has not been started.")
326 
327  self.create_widgets_for_all_processescreate_widgets_for_all_processes(f, index)
328 
329  def show_statistics(self, index=None):
330  """
331  Show the statistics in a smart manner
332  """
333 
334  def f(process):
335  if "ipython.statistics" in self.get_keysget_keys(process):
336  return viewer.StatisticsViewer(self.getget("ipython.statistics", process))
337  else:
338  return None
339 
340  self.create_widgets_for_all_processescreate_widgets_for_all_processes(f, index)
341 
342  def show_collections(self, index=None):
343  """
344  Show some snapshots on the collections.
345  Remember to add the PrintCollectionsPython Module for that!
346  """
347 
348  def f(process):
349  if "ipython.store_content" in self.get_keysget_keys(process):
350  return viewer.CollectionsViewer(self.getget("ipython.store_content", process))
351  else:
352  return None
353 
354  self.create_widgets_for_all_processescreate_widgets_for_all_processes(f, index)
355 
356  def get_statistics(self, index=None):
357  """
358  Return the statistics of the process if finished
359  """
360  def f(process):
361  if self.is_finishedis_finished(process) and not self.has_failedhas_failed(process):
362  return self.getget("ipython.statistics", process)
363  else:
364  raise AssertionError("Calculation has not finished.")
365 
366  return self.map_on_processesmap_on_processes(f, index)
367 
368  def append(self, result_queue, log_file_name, parameters, **kwargs):
369  """
370  Construct a new process with the given parameters and add it to the process_list.
371  """
372  self.process_listprocess_list.append(self._calculation_process_type_calculation_process_type(result_queue=result_queue, log_file_name=log_file_name,
373  parameters=parameters, **kwargs))
_calculation_process_type
The calculation process type to use.
Definition: calculation.py:36
process_list
The process list (possibly empty, even later)
Definition: calculation.py:31
def wait_for_end(self, display_bar=True, send_notification=False, max_processes=None)
Definition: calculation.py:107
def map_on_processes(self, map_function, index)
Definition: calculation.py:207
def create_widgets_for_all_processes(self, widget_function, index=None)
Definition: calculation.py:298
def append(self, result_queue, log_file_name, parameters, **kwargs)
Definition: calculation.py:368
def show_end_result(self, process, process_bars)
Definition: calculation.py:196
def start_batched_and_wait_for_end(self, max_processes=None)
Definition: calculation.py:59
def __init__(self, process_list=None)
Definition: calculation.py:27
def ensure_running(self, max_processes)
Definition: calculation.py:90
def get(self, name, index=None)
Definition: calculation.py:247