13 from hep_ipython_tools
import viewer
16 import multiprocessing
22 Create a Calculation from the given Process that handles
23 the status of the process and the actions like start, stop or wait_for_end
24 Do not create instances of this class by yourself but rather use the IPythonHandler for this.
28 """ Init with an empty list of processes """
40 Make the class iterable over all single processes
46 """ Return the number of handled processes """
49 def stop(self, index=None):
51 Kill the processes. Please keep in mind that killing a process is normaly accompanied with data loss.
61 Start part of the processes and wait for all to finish. If max_processes is None,
62 only n processes will be started where n is the number of cores on the current machine.
63 As soon as one process is finished, a waiting one will be started in order to fill all cores
67 max_processes: The number of processes which can be run at the same time
70 max_processes = multiprocessing.cpu_count()
73 self.
wait_for_endwait_for_end(max_processes=max_processes)
77 Start the processes in the background.
78 Raises an error of the process has been started already.
79 You can not restart a process. If you want to do so, create another Calculation instance.
82 if not process.already_run:
84 process.already_run =
True
86 raise AssertionError(
"Calculation can not be started twice.")
92 Ensure that the max_processes number of processes is running and will start
93 processes which are waiting and have not been started yet.
98 processes_to_start = max(0, max_processes - len(running_processes))
102 if not process.already_run
and processes_to_start > 0:
104 process.already_run =
True
105 processes_to_start = processes_to_start - 1
107 def wait_for_end(self, display_bar=True, send_notification=False, max_processes=None):
109 Send the calculation into the foreground by halting the notebook as
110 long as the process is running. Shows a progress bar with the number
111 of processed events. Please keep in mind that you can not execute
112 cells in the notebook when having called wait_for_end (but before -
113 although a calculation is running.)
117 display_bar: If true, the display bar is used to show in the notebook
118 that the computation is complete.
119 send_notification: If true, the notify2 library will be used to
120 notify the user if the computation is complete. This will only
121 work if the jupyter notebook is hosted on the local desktop
123 max_processes: The maximum number of processes which will be run on
124 the machine. This has no effect when start() has been called
125 before. This parameter can not be used directly, but
126 start_batched_and_wait_for_end() should be used.
132 for process
in self.
process_listprocess_list
if process.is_valid}
134 started_processes = [p
for p
in self.
process_listprocess_list
if p.is_valid
and p.already_run]
135 running_processes = started_processes
137 while len(running_processes) > 0:
142 started_processes = [p
for p
in self.
process_listprocess_list
if p.is_valid
and p.already_run]
143 running_processes = [p
for p
in started_processes
if self.
is_runningis_running(p)]
144 ended_processes = [p
for p
in started_processes
if not self.
is_runningis_running(p)]
146 for process
in ended_processes:
150 if process
in process_bars:
151 del process_bars[process]
153 for process
in running_processes:
155 if not process.is_valid:
165 elif process.progress_queue_local.poll():
166 result = process.progress_queue_local.recv()
167 if result !=
"end" and display_bar:
168 process_bar = process_bars[process]
169 process_bar.update(result)
172 process.result_queue.fill_results()
173 process.join(timeout=0.01)
182 if send_notification:
186 notify2.init(
"basf2")
187 n = notify2.Notification(
"basf2",
188 "Calculation finished",
189 "notification-message-im"
194 raise ImportError(
"notify2 library must be installed to show Desktop notifications.")
198 Shows the end result (finished or failed) for all processes in the process_bars list
200 if process
in process_bars:
201 process_bar = process_bars[process]
203 process_bar.update(
"failed!")
205 process_bar.update(
"finished")
209 Calculate a function on all processes and collect the results if index is None.
210 Else calculate the function only one the given process or the process number.
216 return list(map(map_function, self.
process_listprocess_list))
218 if isinstance(index, int):
219 return map_function(self.
process_listprocess_list[index])
221 return map_function(index)
225 Test if the process is still running
227 return self.
map_on_processesmap_on_processes(
lambda process: process.is_alive(), index)
231 Test if the process has finished
233 return self.
map_on_processesmap_on_processes(
lambda process: process.already_run
and not self.
is_runningis_running(process), index)
237 Test if the process has failed.
241 return process.exitcode != 0
243 raise AssertionError(
"Calculation has not finished.")
247 def get(self, name, index=None):
249 Return the saved queue item with the given name
251 def f(process, name):
253 return process.get(name)
256 return self.
map_on_processesmap_on_processes(
lambda process: f(process, name), index)
260 Return the names of the items that were saved in the queue
262 return self.
map_on_processesmap_on_processes(
lambda process: process.get_keys(), index)
266 Return the log of the process if finished
270 return process.get_log()
272 raise AssertionError(
"Calculation has not been started.")
278 Return a string describing the current status if the calculation
281 if not process.already_run:
294 Return the parameters used to start this calculation
296 return self.
map_on_processesmap_on_processes(
lambda process: process.parameters, index)
300 Create a overview widget for all processes or only one for the given process.
304 widget = widget_function(self.
process_listprocess_list[0])
309 if isinstance(index, int):
310 widget = widget_function(self.
process_listprocess_list[index])
312 widget = widget_function(index)
318 Show the log of the underlying process(es).
325 raise AssertionError(
"Calculation has not been started.")
331 Show the statistics in a smart manner
335 if "ipython.statistics" in self.
get_keysget_keys(process):
344 Show some snapshots on the collections.
345 Remember to add the PrintCollectionsPython Module for that!
349 if "ipython.store_content" in self.
get_keysget_keys(process):
358 Return the statistics of the process if finished
362 return self.
getget(
"ipython.statistics", process)
364 raise AssertionError(
"Calculation has not finished.")
368 def append(self, result_queue, log_file_name, parameters, **kwargs):
370 Construct a new process with the given parameters and add it to the process_list.
373 parameters=parameters, **kwargs))