Belle II Software  release-05-02-19
calculation.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 try:
5  from inspect import signature
6 except ImportError:
7  from funcsigs import signature
8 import time
9 
10 from hep_ipython_tools import viewer
11 from hep_ipython_tools.calculation_process import CalculationProcess
12 
13 import multiprocessing
14 
15 
17 
18  """
19  Create a Calculation from the given Process that handles
20  the status of the process and the actions like start, stop or wait_for_end
21  Do not create instances of this class by yourself but rather use the IPythonHandler for this.
22  """
23 
24  def __init__(self, process_list=None):
25  """ Init with an empty list of processes """
26 
27  if process_list:
28  self.process_list = process_list
29  else:
30  self.process_list = []
31 
32 
33  self._calculation_process_type = CalculationProcess
34 
35  def __iter__(self):
36  """
37  Make the class iterable over all single processes
38  """
39  for process in self.process_list:
40  yield Calculation([process])
41 
42  def __len__(self):
43  """ Return the number of handled processes """
44  return len(self.process_list)
45 
46  def stop(self, index=None):
47  """
48  Kill the processes. Please keep in mind that killing a process is normaly accompanied with data loss.
49  """
50  def f(process):
51  if self.is_running(process):
52  process.terminate()
53 
54  self.map_on_processes(f, index)
55 
56  def start_batched_and_wait_for_end(self, max_processes=None):
57  """
58  Start part of the processes and wait for all to finish. If max_processes is None,
59  only n processes will be started where n is the number of cores on the current machine.
60  As soon as one process is finished, a waiting one will be started in order to fill all cores
61  of the machine.
62 
63  Parameters:
64  max_processes: The number of processes which can be run at the same time
65  """
66  if not max_processes:
67  max_processes = multiprocessing.cpu_count()
68 
69  self.ensure_running(max_processes)
70  self.wait_for_end(max_processes=max_processes)
71 
72  def start(self, index=None):
73  """
74  Start the processes in the background.
75  Raises an error of the process has been started already.
76  You can not restart a process. If you want to do so, create another Calculation instance.
77  """
78  def f(process):
79  if not process.already_run:
80  process.start()
81  process.already_run = True
82  else:
83  raise AssertionError("Calculation can not be started twice.")
84 
85  self.map_on_processes(f, index)
86 
87  def ensure_running(self, max_processes):
88  """
89  Ensure that the max_processes number of processes is running and will start
90  processes which are waiting and have not been started yet.
91  """
92 
93  running_processes = [p for p in self.process_list if self.is_running(p)]
94 
95  processes_to_start = max(0, max_processes - len(running_processes))
96 
97  for process in self.process_list:
98  # start processes as long as processes_to_start is larger than zero
99  if not process.already_run and processes_to_start > 0:
100  process.start()
101  process.already_run = True
102  processes_to_start = processes_to_start - 1
103 
104  def wait_for_end(self, display_bar=True, send_notification=False, max_processes=None):
105  """
106  Send the calculation into the foreground by halting the notebook as
107  long as the process is running. Shows a progress bar with the number
108  of processed events. Please keep in mind that you can not execute
109  cells in the notebook when having called wait_for_end (but before -
110  although a calculation is running.)
111 
112 
113  Parameters:
114  display_bar: If true, the display bar is used to show in the notebook
115  that the computation is complete.
116  send_notification: If true, the notify2 library will be used to
117  notify the user if the computation is complete. This will only
118  work if the jupyter notebook is hosted on the local desktop
119  machine.
120  max_processes: The maximum number of processes which will be run on
121  the machine. This has no effect when start() has been called
122  before. This parameter can not be used directly, but
123  start_batched_and_wait_for_end() should be used.
124  """
125 
126  if display_bar:
127  # Initialize all process bars
128  process_bars = {process: viewer.ProgressBarViewer()
129  for process in self.process_list if process.is_valid}
130 
131  started_processes = [p for p in self.process_list if p.is_valid and p.already_run]
132  running_processes = started_processes
133  # Update all process bars as long as minimum one process is running
134  while len(running_processes) > 0:
135 
136  if max_processes:
137  self.ensure_running(max_processes)
138 
139  started_processes = [p for p in self.process_list if p.is_valid and p.already_run]
140  running_processes = [p for p in started_processes if self.is_running(p)]
141  ended_processes = [p for p in started_processes if not self.is_running(p)]
142 
143  for process in ended_processes:
144  if display_bar:
145  self.show_end_result(process, process_bars)
146  # Only update the process bar once
147  if process in process_bars:
148  del process_bars[process]
149 
150  for process in running_processes:
151  # Check if the process is valid
152  if not process.is_valid:
153  if display_bar:
154  self.show_end_result(process, process_bars)
155 
156  # Check if the process is still running. If not set the end result correctly.
157  elif not self.is_running(process):
158  if display_bar:
159  self.show_end_result(process, process_bars)
160 
161  # Check if there are news from the process python module (a new percentage)
162  elif process.progress_queue_local.poll():
163  result = process.progress_queue_local.recv()
164  if result != "end" and display_bar:
165  process_bar = process_bars[process]
166  process_bar.update(result)
167 
168  else:
169  process.result_queue.fill_results()
170  process.join(timeout=0.01)
171 
172  time.sleep(0.01)
173 
174  if display_bar:
175  for process in self.process_list:
176  self.show_end_result(process, process_bars)
177 
178  # send notification if requested and notify2 library is available
179  if send_notification:
180  try:
181  import notify2
182 
183  notify2.init("basf2")
184  n = notify2.Notification("basf2", # head line
185  "Calculation finished", # Description text
186  "notification-message-im" # Icon name
187  )
188  n.show()
189  except ImportError:
190  # re-throw with more useful message
191  raise ImportError("notify2 library must be installed to show Desktop notifications.")
192 
193  def show_end_result(self, process, process_bars):
194  """
195  Shows the end result (finished or failed) for all processes in the process_bars list
196  """
197  if process in process_bars:
198  process_bar = process_bars[process]
199  if self.has_failed(process):
200  process_bar.update("failed!")
201  else:
202  process_bar.update("finished")
203 
204  def map_on_processes(self, map_function, index):
205  """
206  Calculate a function on all processes and collect the results if index is None.
207  Else calculate the function only one the given process or the process number.
208  """
209  if len(self.process_list) == 1:
210  return map_function(self.process_list[0])
211  else:
212  if index is None:
213  return list(map(map_function, self.process_list))
214  else:
215  if isinstance(index, int):
216  return map_function(self.process_list[index])
217  else:
218  return map_function(index)
219 
220  def is_running(self, index=None):
221  """
222  Test if the process is still running
223  """
224  return self.map_on_processes(lambda process: process.is_alive(), index)
225 
226  def is_finished(self, index=None):
227  """
228  Test if the process has finished
229  """
230  return self.map_on_processes(lambda process: process.already_run and not self.is_running(process), index)
231 
232  def has_failed(self, index=None):
233  """
234  Test if the process has failed.
235  """
236  def f(process):
237  if self.is_finished(process):
238  return process.exitcode != 0
239  else:
240  raise AssertionError("Calculation has not finished.")
241 
242  return self.map_on_processes(f, index)
243 
244  def get(self, name, index=None):
245  """
246  Return the saved queue item with the given name
247  """
248  def f(process, name):
249  try:
250  return process.get(name)
251  except KeyError:
252  return None
253  return self.map_on_processes(lambda process: f(process, name), index)
254 
255  def get_keys(self, index=None):
256  """
257  Return the names of the items that were saved in the queue
258  """
259  return self.map_on_processes(lambda process: process.get_keys(), index)
260 
261  def get_log(self, index=None):
262  """
263  Return the log of the process if finished
264  """
265  def f(process):
266  if self.is_running(process) or self.is_finished(process):
267  return process.get_log()
268  else:
269  raise AssertionError("Calculation has not been started.")
270 
271  return self.map_on_processes(f, index)
272 
273  def get_status(self, index=None):
274  """
275  Return a string describing the current status if the calculation
276  """
277  def f(process):
278  if not process.already_run:
279  return "not started"
280  elif self.is_running(process):
281  return "running"
282  elif self.has_failed(process):
283  return "failed"
284  elif self.is_finished(process):
285  return "finished"
286 
287  return self.map_on_processes(f, index)
288 
289  def get_parameters(self, index=None):
290  """
291  Return the parameters used to start this calculation
292  """
293  return self.map_on_processes(lambda process: process.parameters, index)
294 
295  def create_widgets_for_all_processes(self, widget_function, index=None):
296  """
297  Create a overview widget for all processes or only one for the given process.
298  """
299 
300  if len(self.process_list) == 1:
301  widget = widget_function(self.process_list[0])
302  else:
303  if index is None:
304  widget = viewer.ProcessViewer(list(map(widget_function, self.process_list)))
305  else:
306  if isinstance(index, int):
307  widget = widget_function(self.process_list[index])
308  else:
309  widget = widget_function(index)
310 
311  widget.show()
312 
313  def show_log(self, index=None):
314  """
315  Show the log of the underlying process(es).
316  """
317 
318  def f(process):
319  if self.is_running(process) or self.is_finished(process):
320  return viewer.LogViewer(process.get_log())
321  else:
322  raise AssertionError("Calculation has not been started.")
323 
324  self.create_widgets_for_all_processes(f, index)
325 
326  def show_statistics(self, index=None):
327  """
328  Show the statistics in a smart manner
329  """
330 
331  def f(process):
332  if "ipython.statistics" in self.get_keys(process):
333  return viewer.StatisticsViewer(self.get("ipython.statistics", process))
334  else:
335  return None
336 
337  self.create_widgets_for_all_processes(f, index)
338 
339  def show_collections(self, index=None):
340  """
341  Show some snapshots on the collections.
342  Remember to add the PrintCollectionsPython Module for that!
343  """
344 
345  def f(process):
346  if "ipython.store_content" in self.get_keys(process):
347  return viewer.CollectionsViewer(self.get("ipython.store_content", process))
348  else:
349  return None
350 
351  self.create_widgets_for_all_processes(f, index)
352 
353  def get_statistics(self, index=None):
354  """
355  Return the statistics of the process if finished
356  """
357  def f(process):
358  if self.is_finished(process) and not self.has_failed(process):
359  return self.get("ipython.statistics", process)
360  else:
361  raise AssertionError("Calculation has not finished.")
362 
363  return self.map_on_processes(f, index)
364 
365  def append(self, result_queue, log_file_name, parameters, **kwargs):
366  """
367  Construct a new process with the given parameters and add it to the process_list.
368  """
369  self.process_list.append(self._calculation_process_type(result_queue=result_queue, log_file_name=log_file_name,
370  parameters=parameters, **kwargs))
hep_ipython_tools.calculation.Calculation.start
def start(self, index=None)
Definition: calculation.py:72
hep_ipython_tools.calculation.Calculation.get_parameters
def get_parameters(self, index=None)
Definition: calculation.py:289
hep_ipython_tools.calculation.Calculation.get_status
def get_status(self, index=None)
Definition: calculation.py:273
hep_ipython_tools.viewer.LogViewer
Definition: viewer.py:289
hep_ipython_tools.calculation.Calculation.create_widgets_for_all_processes
def create_widgets_for_all_processes(self, widget_function, index=None)
Definition: calculation.py:295
hep_ipython_tools.calculation.Calculation.process_list
process_list
The process list (possibly empty, even later)
Definition: calculation.py:28
hep_ipython_tools.calculation.Calculation.wait_for_end
def wait_for_end(self, display_bar=True, send_notification=False, max_processes=None)
Definition: calculation.py:104
hep_ipython_tools.calculation.Calculation.get
def get(self, name, index=None)
Definition: calculation.py:244
hep_ipython_tools.calculation_process
Definition: calculation_process.py:1
hep_ipython_tools.calculation.Calculation.__len__
def __len__(self)
Definition: calculation.py:42
hep_ipython_tools.calculation.Calculation.show_end_result
def show_end_result(self, process, process_bars)
Definition: calculation.py:193
hep_ipython_tools.calculation.Calculation.__init__
def __init__(self, process_list=None)
Definition: calculation.py:24
hep_ipython_tools.calculation.Calculation.append
def append(self, result_queue, log_file_name, parameters, **kwargs)
Definition: calculation.py:365
hep_ipython_tools.calculation.Calculation.stop
def stop(self, index=None)
Definition: calculation.py:46
hep_ipython_tools.calculation.Calculation.get_log
def get_log(self, index=None)
Definition: calculation.py:261
hep_ipython_tools.calculation.Calculation.get_statistics
def get_statistics(self, index=None)
Definition: calculation.py:353
hep_ipython_tools.calculation.Calculation.has_failed
def has_failed(self, index=None)
Definition: calculation.py:232
hep_ipython_tools.calculation.Calculation.is_running
def is_running(self, index=None)
Definition: calculation.py:220
hep_ipython_tools.calculation.Calculation
Definition: calculation.py:16
hep_ipython_tools.calculation.Calculation.is_finished
def is_finished(self, index=None)
Definition: calculation.py:226
hep_ipython_tools.calculation.Calculation.show_log
def show_log(self, index=None)
Definition: calculation.py:313
hep_ipython_tools.calculation.Calculation.show_statistics
def show_statistics(self, index=None)
Definition: calculation.py:326
hep_ipython_tools.calculation.Calculation.show_collections
def show_collections(self, index=None)
Definition: calculation.py:339
hep_ipython_tools.calculation.Calculation.ensure_running
def ensure_running(self, max_processes)
Definition: calculation.py:87
hep_ipython_tools.calculation.Calculation.get_keys
def get_keys(self, index=None)
Definition: calculation.py:255
hep_ipython_tools.viewer.ProgressBarViewer
Definition: viewer.py:59
hep_ipython_tools.calculation.Calculation.__iter__
def __iter__(self)
Definition: calculation.py:35
hep_ipython_tools.calculation.Calculation._calculation_process_type
_calculation_process_type
The calculation process type to use.
Definition: calculation.py:33
hep_ipython_tools.viewer.CollectionsViewer
Definition: viewer.py:139
hep_ipython_tools.viewer.StatisticsViewer
Definition: viewer.py:184
hep_ipython_tools.calculation.Calculation.start_batched_and_wait_for_end
def start_batched_and_wait_for_end(self, max_processes=None)
Definition: calculation.py:56
hep_ipython_tools.calculation.Calculation.map_on_processes
def map_on_processes(self, map_function, index)
Definition: calculation.py:204
hep_ipython_tools.viewer.ProcessViewer
Definition: viewer.py:246