5 from inspect
import signature
7 from funcsigs
import signature
10 from hep_ipython_tools
import viewer
13 import multiprocessing
19 Create a Calculation from the given Process that handles
20 the status of the process and the actions like start, stop or wait_for_end
21 Do not create instances of this class by yourself but rather use the IPythonHandler for this.
25 """ Init with an empty list of processes """
37 Make the class iterable over all single processes
43 """ Return the number of handled processes """
46 def stop(self, index=None):
48 Kill the processes. Please keep in mind that killing a process is normaly accompanied with data loss.
58 Start part of the processes and wait for all to finish. If max_processes is None,
59 only n processes will be started where n is the number of cores on the current machine.
60 As soon as one process is finished, a waiting one will be started in order to fill all cores
64 max_processes: The number of processes which can be run at the same time
67 max_processes = multiprocessing.cpu_count()
74 Start the processes in the background.
75 Raises an error of the process has been started already.
76 You can not restart a process. If you want to do so, create another Calculation instance.
79 if not process.already_run:
81 process.already_run =
True
83 raise AssertionError(
"Calculation can not be started twice.")
89 Ensure that the max_processes number of processes is running and will start
90 processes which are waiting and have not been started yet.
95 processes_to_start = max(0, max_processes - len(running_processes))
99 if not process.already_run
and processes_to_start > 0:
101 process.already_run =
True
102 processes_to_start = processes_to_start - 1
104 def wait_for_end(self, display_bar=True, send_notification=False, max_processes=None):
106 Send the calculation into the foreground by halting the notebook as
107 long as the process is running. Shows a progress bar with the number
108 of processed events. Please keep in mind that you can not execute
109 cells in the notebook when having called wait_for_end (but before -
110 although a calculation is running.)
114 display_bar: If true, the display bar is used to show in the notebook
115 that the computation is complete.
116 send_notification: If true, the notify2 library will be used to
117 notify the user if the computation is complete. This will only
118 work if the jupyter notebook is hosted on the local desktop
120 max_processes: The maximum number of processes which will be run on
121 the machine. This has no effect when start() has been called
122 before. This parameter can not be used directly, but
123 start_batched_and_wait_for_end() should be used.
131 started_processes = [p
for p
in self.
process_list if p.is_valid
and p.already_run]
132 running_processes = started_processes
134 while len(running_processes) > 0:
139 started_processes = [p
for p
in self.
process_list if p.is_valid
and p.already_run]
140 running_processes = [p
for p
in started_processes
if self.
is_running(p)]
141 ended_processes = [p
for p
in started_processes
if not self.
is_running(p)]
143 for process
in ended_processes:
147 if process
in process_bars:
148 del process_bars[process]
150 for process
in running_processes:
152 if not process.is_valid:
162 elif process.progress_queue_local.poll():
163 result = process.progress_queue_local.recv()
164 if result !=
"end" and display_bar:
165 process_bar = process_bars[process]
166 process_bar.update(result)
169 process.result_queue.fill_results()
170 process.join(timeout=0.01)
179 if send_notification:
183 notify2.init(
"basf2")
184 n = notify2.Notification(
"basf2",
185 "Calculation finished",
186 "notification-message-im"
191 raise ImportError(
"notify2 library must be installed to show Desktop notifications.")
195 Shows the end result (finished or failed) for all processes in the process_bars list
197 if process
in process_bars:
198 process_bar = process_bars[process]
200 process_bar.update(
"failed!")
202 process_bar.update(
"finished")
206 Calculate a function on all processes and collect the results if index is None.
207 Else calculate the function only one the given process or the process number.
215 if isinstance(index, int):
218 return map_function(index)
222 Test if the process is still running
228 Test if the process has finished
234 Test if the process has failed.
238 return process.exitcode != 0
240 raise AssertionError(
"Calculation has not finished.")
244 def get(self, name, index=None):
246 Return the saved queue item with the given name
248 def f(process, name):
250 return process.get(name)
257 Return the names of the items that were saved in the queue
263 Return the log of the process if finished
267 return process.get_log()
269 raise AssertionError(
"Calculation has not been started.")
275 Return a string describing the current status if the calculation
278 if not process.already_run:
291 Return the parameters used to start this calculation
297 Create a overview widget for all processes or only one for the given process.
306 if isinstance(index, int):
309 widget = widget_function(index)
315 Show the log of the underlying process(es).
322 raise AssertionError(
"Calculation has not been started.")
328 Show the statistics in a smart manner
332 if "ipython.statistics" in self.
get_keys(process):
341 Show some snapshots on the collections.
342 Remember to add the PrintCollectionsPython Module for that!
346 if "ipython.store_content" in self.
get_keys(process):
355 Return the statistics of the process if finished
359 return self.
get(
"ipython.statistics", process)
361 raise AssertionError(
"Calculation has not finished.")
365 def append(self, result_queue, log_file_name, parameters, **kwargs):
367 Construct a new process with the given parameters and add it to the process_list.
370 parameters=parameters, **kwargs))