Belle II Software development
Calculation Class Reference
Inheritance diagram for Calculation:
Basf2Calculation

Public Member Functions

def __init__ (self, process_list=None)
 
def __iter__ (self)
 
def __len__ (self)
 
def stop (self, index=None)
 
def start_batched_and_wait_for_end (self, max_processes=None)
 
def start (self, index=None)
 
def ensure_running (self, max_processes)
 
def wait_for_end (self, display_bar=True, send_notification=False, max_processes=None)
 
def show_end_result (self, process, process_bars)
 
def map_on_processes (self, map_function, index)
 
def is_running (self, index=None)
 
def is_finished (self, index=None)
 
def has_failed (self, index=None)
 
def get (self, name, index=None)
 
def get_keys (self, index=None)
 
def get_log (self, index=None)
 
def get_status (self, index=None)
 
def get_parameters (self, index=None)
 
def create_widgets_for_all_processes (self, widget_function, index=None)
 
def show_log (self, index=None)
 
def show_statistics (self, index=None)
 
def show_collections (self, index=None)
 
def get_statistics (self, index=None)
 
def append (self, result_queue, log_file_name, parameters, **kwargs)
 

Public Attributes

 process_list
 The process list (possibly empty, even later)
 

Protected Attributes

 _calculation_process_type
 The calculation process type to use.
 

Detailed Description

Create a Calculation from the given Process that handles
the status of the process and the actions like start, stop or wait_for_end
Do not create instances of this class by yourself but rather use the IPythonHandler for this.

Definition at line 19 of file calculation.py.

Constructor & Destructor Documentation

◆ __init__()

def __init__ (   self,
  process_list = None 
)
 Init with an empty list of processes 

Reimplemented in Basf2Calculation.

Definition at line 27 of file calculation.py.

27 def __init__(self, process_list=None):
28 """ Init with an empty list of processes """
29
30 if process_list:
31 self.process_list = process_list
32 else:
33 self.process_list = []
34
35
36 self._calculation_process_type = CalculationProcess
37

Member Function Documentation

◆ __iter__()

def __iter__ (   self)
Make the class iterable over all single processes

Definition at line 38 of file calculation.py.

38 def __iter__(self):
39 """
40 Make the class iterable over all single processes
41 """
42 for process in self.process_list:
43 yield Calculation([process])
44

◆ __len__()

def __len__ (   self)
 Return the number of handled processes 

Definition at line 45 of file calculation.py.

45 def __len__(self):
46 """ Return the number of handled processes """
47 return len(self.process_list)
48

◆ append()

def append (   self,
  result_queue,
  log_file_name,
  parameters,
**  kwargs 
)
Construct a new process with the given parameters and add it to the process_list.

Definition at line 368 of file calculation.py.

368 def append(self, result_queue, log_file_name, parameters, **kwargs):
369 """
370 Construct a new process with the given parameters and add it to the process_list.
371 """
372 self.process_list.append(self._calculation_process_type(result_queue=result_queue, log_file_name=log_file_name,
373 parameters=parameters, **kwargs))

◆ create_widgets_for_all_processes()

def create_widgets_for_all_processes (   self,
  widget_function,
  index = None 
)
Create a overview widget for all processes or only one for the given process.

Definition at line 298 of file calculation.py.

298 def create_widgets_for_all_processes(self, widget_function, index=None):
299 """
300 Create a overview widget for all processes or only one for the given process.
301 """
302
303 if len(self.process_list) == 1:
304 widget = widget_function(self.process_list[0])
305 else:
306 if index is None:
307 widget = viewer.ProcessViewer(list(map(widget_function, self.process_list)))
308 else:
309 if isinstance(index, int):
310 widget = widget_function(self.process_list[index])
311 else:
312 widget = widget_function(index)
313
314 widget.show()
315

◆ ensure_running()

def ensure_running (   self,
  max_processes 
)
Ensure that the max_processes number of processes is running and will start
processes which are waiting and have not been started yet.

Definition at line 90 of file calculation.py.

90 def ensure_running(self, max_processes):
91 """
92 Ensure that the max_processes number of processes is running and will start
93 processes which are waiting and have not been started yet.
94 """
95
96 running_processes = [p for p in self.process_list if self.is_running(p)]
97
98 processes_to_start = max(0, max_processes - len(running_processes))
99
100 for process in self.process_list:
101 # start processes as long as processes_to_start is larger than zero
102 if not process.already_run and processes_to_start > 0:
103 process.start()
104 process.already_run = True
105 processes_to_start = processes_to_start - 1
106

◆ get()

def get (   self,
  name,
  index = None 
)
Return the saved queue item with the given name

Definition at line 247 of file calculation.py.

247 def get(self, name, index=None):
248 """
249 Return the saved queue item with the given name
250 """
251 def f(process, name):
252 try:
253 return process.get(name)
254 except KeyError:
255 return None
256 return self.map_on_processes(lambda process: f(process, name), index)
257

◆ get_keys()

def get_keys (   self,
  index = None 
)
Return the names of the items that were saved in the queue

Definition at line 258 of file calculation.py.

258 def get_keys(self, index=None):
259 """
260 Return the names of the items that were saved in the queue
261 """
262 return self.map_on_processes(lambda process: process.get_keys(), index)
263

◆ get_log()

def get_log (   self,
  index = None 
)
Return the log of the process if finished

Definition at line 264 of file calculation.py.

264 def get_log(self, index=None):
265 """
266 Return the log of the process if finished
267 """
268 def f(process):
269 if self.is_running(process) or self.is_finished(process):
270 return process.get_log()
271 else:
272 raise AssertionError("Calculation has not been started.")
273
274 return self.map_on_processes(f, index)
275

◆ get_parameters()

def get_parameters (   self,
  index = None 
)
Return the parameters used to start this calculation

Definition at line 292 of file calculation.py.

292 def get_parameters(self, index=None):
293 """
294 Return the parameters used to start this calculation
295 """
296 return self.map_on_processes(lambda process: process.parameters, index)
297

◆ get_statistics()

def get_statistics (   self,
  index = None 
)
Return the statistics of the process if finished

Definition at line 356 of file calculation.py.

356 def get_statistics(self, index=None):
357 """
358 Return the statistics of the process if finished
359 """
360 def f(process):
361 if self.is_finished(process) and not self.has_failed(process):
362 return self.get("ipython.statistics", process)
363 else:
364 raise AssertionError("Calculation has not finished.")
365
366 return self.map_on_processes(f, index)
367

◆ get_status()

def get_status (   self,
  index = None 
)
Return a string describing the current status if the calculation

Definition at line 276 of file calculation.py.

276 def get_status(self, index=None):
277 """
278 Return a string describing the current status if the calculation
279 """
280 def f(process):
281 if not process.already_run:
282 return "not started"
283 elif self.is_running(process):
284 return "running"
285 elif self.has_failed(process):
286 return "failed"
287 elif self.is_finished(process):
288 return "finished"
289
290 return self.map_on_processes(f, index)
291

◆ has_failed()

def has_failed (   self,
  index = None 
)
Test if the process has failed.

Definition at line 235 of file calculation.py.

235 def has_failed(self, index=None):
236 """
237 Test if the process has failed.
238 """
239 def f(process):
240 if self.is_finished(process):
241 return process.exitcode != 0
242 else:
243 raise AssertionError("Calculation has not finished.")
244
245 return self.map_on_processes(f, index)
246

◆ is_finished()

def is_finished (   self,
  index = None 
)
Test if the process has finished

Definition at line 229 of file calculation.py.

229 def is_finished(self, index=None):
230 """
231 Test if the process has finished
232 """
233 return self.map_on_processes(lambda process: process.already_run and not self.is_running(process), index)
234

◆ is_running()

def is_running (   self,
  index = None 
)
Test if the process is still running

Definition at line 223 of file calculation.py.

223 def is_running(self, index=None):
224 """
225 Test if the process is still running
226 """
227 return self.map_on_processes(lambda process: process.is_alive(), index)
228

◆ map_on_processes()

def map_on_processes (   self,
  map_function,
  index 
)
Calculate a function on all processes and collect the results if index is None.
Else calculate the function only one the given process or the process number.

Definition at line 207 of file calculation.py.

207 def map_on_processes(self, map_function, index):
208 """
209 Calculate a function on all processes and collect the results if index is None.
210 Else calculate the function only one the given process or the process number.
211 """
212 if len(self.process_list) == 1:
213 return map_function(self.process_list[0])
214 else:
215 if index is None:
216 return list(map(map_function, self.process_list))
217 else:
218 if isinstance(index, int):
219 return map_function(self.process_list[index])
220 else:
221 return map_function(index)
222

◆ show_collections()

def show_collections (   self,
  index = None 
)
Show some snapshots on the collections.
Remember to add the PrintCollectionsPython Module for that!

Definition at line 342 of file calculation.py.

342 def show_collections(self, index=None):
343 """
344 Show some snapshots on the collections.
345 Remember to add the PrintCollectionsPython Module for that!
346 """
347
348 def f(process):
349 if "ipython.store_content" in self.get_keys(process):
350 return viewer.CollectionsViewer(self.get("ipython.store_content", process))
351 else:
352 return None
353
354 self.create_widgets_for_all_processes(f, index)
355

◆ show_end_result()

def show_end_result (   self,
  process,
  process_bars 
)
Shows the end result (finished or failed) for all processes in the process_bars list

Definition at line 196 of file calculation.py.

196 def show_end_result(self, process, process_bars):
197 """
198 Shows the end result (finished or failed) for all processes in the process_bars list
199 """
200 if process in process_bars:
201 process_bar = process_bars[process]
202 if self.has_failed(process):
203 process_bar.update("failed!")
204 else:
205 process_bar.update("finished")
206

◆ show_log()

def show_log (   self,
  index = None 
)
Show the log of the underlying process(es).

Definition at line 316 of file calculation.py.

316 def show_log(self, index=None):
317 """
318 Show the log of the underlying process(es).
319 """
320
321 def f(process):
322 if self.is_running(process) or self.is_finished(process):
323 return viewer.LogViewer(process.get_log())
324 else:
325 raise AssertionError("Calculation has not been started.")
326
327 self.create_widgets_for_all_processes(f, index)
328

◆ show_statistics()

def show_statistics (   self,
  index = None 
)
Show the statistics in a smart manner

Definition at line 329 of file calculation.py.

329 def show_statistics(self, index=None):
330 """
331 Show the statistics in a smart manner
332 """
333
334 def f(process):
335 if "ipython.statistics" in self.get_keys(process):
336 return viewer.StatisticsViewer(self.get("ipython.statistics", process))
337 else:
338 return None
339
340 self.create_widgets_for_all_processes(f, index)
341

◆ start()

def start (   self,
  index = None 
)
Start the processes in the background.
Raises an error of the process has been started already.
You can not restart a process. If you want to do so, create another Calculation instance.

Definition at line 75 of file calculation.py.

75 def start(self, index=None):
76 """
77 Start the processes in the background.
78 Raises an error of the process has been started already.
79 You can not restart a process. If you want to do so, create another Calculation instance.
80 """
81 def f(process):
82 if not process.already_run:
83 process.start()
84 process.already_run = True
85 else:
86 raise AssertionError("Calculation can not be started twice.")
87
88 self.map_on_processes(f, index)
89

◆ start_batched_and_wait_for_end()

def start_batched_and_wait_for_end (   self,
  max_processes = None 
)
Start part of the processes and wait for all to finish. If max_processes is None,
only n processes will be started where n is the number of cores on the current machine.
As soon as one process is finished, a waiting one will be started in order to fill all cores
of the machine.

Parameters:
  max_processes: The number of processes which can be run at the same time

Definition at line 59 of file calculation.py.

59 def start_batched_and_wait_for_end(self, max_processes=None):
60 """
61 Start part of the processes and wait for all to finish. If max_processes is None,
62 only n processes will be started where n is the number of cores on the current machine.
63 As soon as one process is finished, a waiting one will be started in order to fill all cores
64 of the machine.
65
66 Parameters:
67 max_processes: The number of processes which can be run at the same time
68 """
69 if not max_processes:
70 max_processes = multiprocessing.cpu_count()
71
72 self.ensure_running(max_processes)
73 self.wait_for_end(max_processes=max_processes)
74

◆ stop()

def stop (   self,
  index = None 
)
Kill the processes. Please keep in mind that killing a process is normaly accompanied with data loss.

Definition at line 49 of file calculation.py.

49 def stop(self, index=None):
50 """
51 Kill the processes. Please keep in mind that killing a process is normaly accompanied with data loss.
52 """
53 def f(process):
54 if self.is_running(process):
55 process.terminate()
56
57 self.map_on_processes(f, index)
58

◆ wait_for_end()

def wait_for_end (   self,
  display_bar = True,
  send_notification = False,
  max_processes = None 
)
Send the calculation into the foreground by halting the notebook as
long as the process is running.  Shows a progress bar with the number
of processed events.  Please keep in mind that you can not execute
cells in the notebook when having called wait_for_end (but before -
although a calculation is running.)


Parameters:
  display_bar: If true, the display bar is used to show in the notebook
        that the computation is complete.
  send_notification: If true, the notify2 library will be used to
        notify the user if the computation is complete. This will only
        work if the jupyter notebook is hosted on the local desktop
        machine.
  max_processes: The maximum number of processes which will be run on
        the machine. This has no effect when start() has been called
        before.  This parameter can not be used directly, but
        start_batched_and_wait_for_end() should be used.

Definition at line 107 of file calculation.py.

107 def wait_for_end(self, display_bar=True, send_notification=False, max_processes=None):
108 """
109 Send the calculation into the foreground by halting the notebook as
110 long as the process is running. Shows a progress bar with the number
111 of processed events. Please keep in mind that you can not execute
112 cells in the notebook when having called wait_for_end (but before -
113 although a calculation is running.)
114
115
116 Parameters:
117 display_bar: If true, the display bar is used to show in the notebook
118 that the computation is complete.
119 send_notification: If true, the notify2 library will be used to
120 notify the user if the computation is complete. This will only
121 work if the jupyter notebook is hosted on the local desktop
122 machine.
123 max_processes: The maximum number of processes which will be run on
124 the machine. This has no effect when start() has been called
125 before. This parameter can not be used directly, but
126 start_batched_and_wait_for_end() should be used.
127 """
128
129 if display_bar:
130 # Initialize all process bars
131 process_bars = {process: viewer.ProgressBarViewer()
132 for process in self.process_list if process.is_valid}
133
134 started_processes = [p for p in self.process_list if p.is_valid and p.already_run]
135 running_processes = started_processes
136 # Update all process bars as long as minimum one process is running
137 while len(running_processes) > 0:
138
139 if max_processes:
140 self.ensure_running(max_processes)
141
142 started_processes = [p for p in self.process_list if p.is_valid and p.already_run]
143 running_processes = [p for p in started_processes if self.is_running(p)]
144 ended_processes = [p for p in started_processes if not self.is_running(p)]
145
146 for process in ended_processes:
147 if display_bar:
148 self.show_end_result(process, process_bars)
149 # Only update the process bar once
150 if process in process_bars:
151 del process_bars[process]
152
153 for process in running_processes:
154 # Check if the process is valid
155 if not process.is_valid:
156 if display_bar:
157 self.show_end_result(process, process_bars)
158
159 # Check if the process is still running. If not set the end result correctly.
160 elif not self.is_running(process):
161 if display_bar:
162 self.show_end_result(process, process_bars)
163
164 # Check if there are news from the process python module (a new percentage)
165 elif process.progress_queue_local.poll():
166 result = process.progress_queue_local.recv()
167 if result != "end" and display_bar:
168 process_bar = process_bars[process]
169 process_bar.update(result)
170
171 else:
172 process.result_queue.fill_results()
173 process.join(timeout=0.01)
174
175 time.sleep(0.01)
176
177 if display_bar:
178 for process in self.process_list:
179 self.show_end_result(process, process_bars)
180
181 # send notification if requested and notify2 library is available
182 if send_notification:
183 try:
184 import notify2
185
186 notify2.init("basf2")
187 n = notify2.Notification("basf2", # head line
188 "Calculation finished", # Description text
189 "notification-message-im" # Icon name
190 )
191 n.show()
192 except ImportError:
193 # re-throw with more useful message
194 raise ImportError("notify2 library must be installed to show Desktop notifications.")
195

Member Data Documentation

◆ _calculation_process_type

_calculation_process_type
protected

The calculation process type to use.

Definition at line 36 of file calculation.py.

◆ process_list

process_list

The process list (possibly empty, even later)

Definition at line 31 of file calculation.py.


The documentation for this class was generated from the following file: