Belle II Software development
calculation.py
1#!/usr/bin/env python3
2
3
10
11import time
12
13from hep_ipython_tools import viewer
14from hep_ipython_tools.calculation_process import CalculationProcess
15
16import multiprocessing
17
18
20
21 """
22 Create a Calculation from the given Process that handles
23 the status of the process and the actions like start, stop or wait_for_end
24 Do not create instances of this class by yourself but rather use the IPythonHandler for this.
25 """
26
27 def __init__(self, process_list=None):
28 """ Init with an empty list of processes """
29
30 if process_list:
31 self.process_list = process_list
32 else:
33 self.process_list = []
34
35
36 self._calculation_process_type = CalculationProcess
37
38 def __iter__(self):
39 """
40 Make the class iterable over all single processes
41 """
42 for process in self.process_list:
43 yield Calculation([process])
44
45 def __len__(self):
46 """ Return the number of handled processes """
47 return len(self.process_list)
48
49 def stop(self, index=None):
50 """
51 Kill the processes. Please keep in mind that killing a process is normaly accompanied with data loss.
52 """
53 def f(process):
54 if self.is_running(process):
55 process.terminate()
56
57 self.map_on_processes(f, index)
58
59 def start_batched_and_wait_for_end(self, max_processes=None):
60 """
61 Start part of the processes and wait for all to finish. If max_processes is None,
62 only n processes will be started where n is the number of cores on the current machine.
63 As soon as one process is finished, a waiting one will be started in order to fill all cores
64 of the machine.
65
66 Parameters:
67 max_processes: The number of processes which can be run at the same time
68 """
69 if not max_processes:
70 max_processes = multiprocessing.cpu_count()
71
72 self.ensure_running(max_processes)
73 self.wait_for_end(max_processes=max_processes)
74
75 def start(self, index=None):
76 """
77 Start the processes in the background.
78 Raises an error of the process has been started already.
79 You can not restart a process. If you want to do so, create another Calculation instance.
80 """
81 def f(process):
82 if not process.already_run:
83 process.start()
84 process.already_run = True
85 else:
86 raise AssertionError("Calculation can not be started twice.")
87
88 self.map_on_processes(f, index)
89
90 def ensure_running(self, max_processes):
91 """
92 Ensure that the max_processes number of processes is running and will start
93 processes which are waiting and have not been started yet.
94 """
95
96 running_processes = [p for p in self.process_list if self.is_running(p)]
97
98 processes_to_start = max(0, max_processes - len(running_processes))
99
100 for process in self.process_list:
101 # start processes as long as processes_to_start is larger than zero
102 if not process.already_run and processes_to_start > 0:
103 process.start()
104 process.already_run = True
105 processes_to_start = processes_to_start - 1
106
107 def wait_for_end(self, display_bar=True, send_notification=False, max_processes=None):
108 """
109 Send the calculation into the foreground by halting the notebook as
110 long as the process is running. Shows a progress bar with the number
111 of processed events. Please keep in mind that you can not execute
112 cells in the notebook when having called wait_for_end (but before -
113 although a calculation is running.)
114
115
116 Parameters:
117 display_bar: If true, the display bar is used to show in the notebook
118 that the computation is complete.
119 send_notification: If true, the notify2 library will be used to
120 notify the user if the computation is complete. This will only
121 work if the jupyter notebook is hosted on the local desktop
122 machine.
123 max_processes: The maximum number of processes which will be run on
124 the machine. This has no effect when start() has been called
125 before. This parameter can not be used directly, but
126 start_batched_and_wait_for_end() should be used.
127 """
128
129 if display_bar:
130 # Initialize all process bars
131 process_bars = {process: viewer.ProgressBarViewer()
132 for process in self.process_list if process.is_valid}
133
134 started_processes = [p for p in self.process_list if p.is_valid and p.already_run]
135 running_processes = started_processes
136 # Update all process bars as long as minimum one process is running
137 while len(running_processes) > 0:
138
139 if max_processes:
140 self.ensure_running(max_processes)
141
142 started_processes = [p for p in self.process_list if p.is_valid and p.already_run]
143 running_processes = [p for p in started_processes if self.is_running(p)]
144 ended_processes = [p for p in started_processes if not self.is_running(p)]
145
146 for process in ended_processes:
147 if display_bar:
148 self.show_end_result(process, process_bars)
149 # Only update the process bar once
150 if process in process_bars:
151 del process_bars[process]
152
153 for process in running_processes:
154 # Check if the process is valid
155 if not process.is_valid:
156 if display_bar:
157 self.show_end_result(process, process_bars)
158
159 # Check if the process is still running. If not set the end result correctly.
160 elif not self.is_running(process):
161 if display_bar:
162 self.show_end_result(process, process_bars)
163
164 # Check if there are news from the process python module (a new percentage)
165 elif process.progress_queue_local.poll():
166 result = process.progress_queue_local.recv()
167 if result != "end" and display_bar:
168 process_bar = process_bars[process]
169 process_bar.update(result)
170
171 else:
172 process.result_queue.fill_results()
173 process.join(timeout=0.01)
174
175 time.sleep(0.01)
176
177 if display_bar:
178 for process in self.process_list:
179 self.show_end_result(process, process_bars)
180
181 # send notification if requested and notify2 library is available
182 if send_notification:
183 try:
184 import notify2
185
186 notify2.init("basf2")
187 n = notify2.Notification("basf2", # head line
188 "Calculation finished", # Description text
189 "notification-message-im" # Icon name
190 )
191 n.show()
192 except ImportError:
193 # re-throw with more useful message
194 raise ImportError("notify2 library must be installed to show Desktop notifications.")
195
196 def show_end_result(self, process, process_bars):
197 """
198 Shows the end result (finished or failed) for all processes in the process_bars list
199 """
200 if process in process_bars:
201 process_bar = process_bars[process]
202 if self.has_failed(process):
203 process_bar.update("failed!")
204 else:
205 process_bar.update("finished")
206
207 def map_on_processes(self, map_function, index):
208 """
209 Calculate a function on all processes and collect the results if index is None.
210 Else calculate the function only one the given process or the process number.
211 """
212 if len(self.process_list) == 1:
213 return map_function(self.process_list[0])
214 else:
215 if index is None:
216 return list(map(map_function, self.process_list))
217 else:
218 if isinstance(index, int):
219 return map_function(self.process_list[index])
220 else:
221 return map_function(index)
222
223 def is_running(self, index=None):
224 """
225 Test if the process is still running
226 """
227 return self.map_on_processes(lambda process: process.is_alive(), index)
228
229 def is_finished(self, index=None):
230 """
231 Test if the process has finished
232 """
233 return self.map_on_processes(lambda process: process.already_run and not self.is_running(process), index)
234
235 def has_failed(self, index=None):
236 """
237 Test if the process has failed.
238 """
239 def f(process):
240 if self.is_finished(process):
241 return process.exitcode != 0
242 else:
243 raise AssertionError("Calculation has not finished.")
244
245 return self.map_on_processes(f, index)
246
247 def get(self, name, index=None):
248 """
249 Return the saved queue item with the given name
250 """
251 def f(process, name):
252 try:
253 return process.get(name)
254 except KeyError:
255 return None
256 return self.map_on_processes(lambda process: f(process, name), index)
257
258 def get_keys(self, index=None):
259 """
260 Return the names of the items that were saved in the queue
261 """
262 return self.map_on_processes(lambda process: process.get_keys(), index)
263
264 def get_log(self, index=None):
265 """
266 Return the log of the process if finished
267 """
268 def f(process):
269 if self.is_running(process) or self.is_finished(process):
270 return process.get_log()
271 else:
272 raise AssertionError("Calculation has not been started.")
273
274 return self.map_on_processes(f, index)
275
276 def get_status(self, index=None):
277 """
278 Return a string describing the current status if the calculation
279 """
280 def f(process):
281 if not process.already_run:
282 return "not started"
283 elif self.is_running(process):
284 return "running"
285 elif self.has_failed(process):
286 return "failed"
287 elif self.is_finished(process):
288 return "finished"
289
290 return self.map_on_processes(f, index)
291
292 def get_parameters(self, index=None):
293 """
294 Return the parameters used to start this calculation
295 """
296 return self.map_on_processes(lambda process: process.parameters, index)
297
298 def create_widgets_for_all_processes(self, widget_function, index=None):
299 """
300 Create a overview widget for all processes or only one for the given process.
301 """
302
303 if len(self.process_list) == 1:
304 widget = widget_function(self.process_list[0])
305 else:
306 if index is None:
307 widget = viewer.ProcessViewer(list(map(widget_function, self.process_list)))
308 else:
309 if isinstance(index, int):
310 widget = widget_function(self.process_list[index])
311 else:
312 widget = widget_function(index)
313
314 widget.show()
315
316 def show_log(self, index=None):
317 """
318 Show the log of the underlying process(es).
319 """
320
321 def f(process):
322 if self.is_running(process) or self.is_finished(process):
323 return viewer.LogViewer(process.get_log())
324 else:
325 raise AssertionError("Calculation has not been started.")
326
328
329 def show_statistics(self, index=None):
330 """
331 Show the statistics in a smart manner
332 """
333
334 def f(process):
335 if "ipython.statistics" in self.get_keys(process):
336 return viewer.StatisticsViewer(self.get("ipython.statistics", process))
337 else:
338 return None
339
341
342 def show_collections(self, index=None):
343 """
344 Show some snapshots on the collections.
345 Remember to add the PrintCollectionsPython Module for that!
346 """
347
348 def f(process):
349 if "ipython.store_content" in self.get_keys(process):
350 return viewer.CollectionsViewer(self.get("ipython.store_content", process))
351 else:
352 return None
353
355
356 def get_statistics(self, index=None):
357 """
358 Return the statistics of the process if finished
359 """
360 def f(process):
361 if self.is_finished(process) and not self.has_failed(process):
362 return self.get("ipython.statistics", process)
363 else:
364 raise AssertionError("Calculation has not finished.")
365
366 return self.map_on_processes(f, index)
367
368 def append(self, result_queue, log_file_name, parameters, **kwargs):
369 """
370 Construct a new process with the given parameters and add it to the process_list.
371 """
372 self.process_list.append(self._calculation_process_type(result_queue=result_queue, log_file_name=log_file_name,
373 parameters=parameters, **kwargs))
_calculation_process_type
The calculation process type to use.
Definition: calculation.py:36
process_list
The process list (possibly empty, even later)
Definition: calculation.py:31
def wait_for_end(self, display_bar=True, send_notification=False, max_processes=None)
Definition: calculation.py:107
def map_on_processes(self, map_function, index)
Definition: calculation.py:207
def create_widgets_for_all_processes(self, widget_function, index=None)
Definition: calculation.py:298
def append(self, result_queue, log_file_name, parameters, **kwargs)
Definition: calculation.py:368
def show_end_result(self, process, process_bars)
Definition: calculation.py:196
def start_batched_and_wait_for_end(self, max_processes=None)
Definition: calculation.py:59
def __init__(self, process_list=None)
Definition: calculation.py:27
def ensure_running(self, max_processes)
Definition: calculation.py:90
def get(self, name, index=None)
Definition: calculation.py:247