13from hep_ipython_tools
import viewer
22 Create a Calculation from the given Process that handles
23 the status of the process
and the actions like start, stop
or wait_for_end
24 Do
not create instances of this
class by yourself but rather use the
IPythonHandler for this.
28 """ Init with an empty list of processes """
40 Make the class iterable over all single processes
46 """ Return the number of handled processes """
49 def stop(self, index=None):
51 Kill the processes. Please keep in mind that killing a process
is normaly accompanied
with data loss.
61 Start part of the processes and wait
for all to finish. If max_processes
is None,
62 only n processes will be started where n
is the number of cores on the current machine.
63 As soon
as one process
is finished, a waiting one will be started
in order to fill all cores
67 max_processes: The number of processes which can be run at the same time
70 max_processes = multiprocessing.cpu_count()
77 Start the processes in the background.
78 Raises an error of the process has been started already.
79 You can
not restart a process. If you want to do so, create another Calculation instance.
82 if not process.already_run:
84 process.already_run =
True
86 raise AssertionError(
"Calculation can not be started twice.")
92 Ensure that the max_processes number of processes is running
and will start
93 processes which are waiting
and have
not been started yet.
98 processes_to_start = max(0, max_processes - len(running_processes))
102 if not process.already_run
and processes_to_start > 0:
104 process.already_run =
True
105 processes_to_start = processes_to_start - 1
107 def wait_for_end(self, display_bar=True, send_notification=False, max_processes=None):
109 Send the calculation into the foreground by halting the notebook as
110 long
as the process
is running. Shows a progress bar
with the number
111 of processed events. Please keep
in mind that you can
not execute
112 cells
in the notebook when having called wait_for_end (but before -
113 although a calculation
is running.)
117 display_bar: If true, the display bar
is used to show
in the notebook
118 that the computation
is complete.
119 send_notification: If true, the notify2 library will be used to
120 notify the user
if the computation
is complete. This will only
121 work
if the jupyter notebook
is hosted on the local desktop
123 max_processes: The maximum number of processes which will be run on
124 the machine. This has no effect when
start() has been called
125 before. This parameter can
not be used directly, but
134 started_processes = [p
for p
in self.
process_list if p.is_valid
and p.already_run]
135 running_processes = started_processes
137 while len(running_processes) > 0:
142 started_processes = [p
for p
in self.
process_list if p.is_valid
and p.already_run]
143 running_processes = [p
for p
in started_processes
if self.
is_running(p)]
144 ended_processes = [p
for p
in started_processes
if not self.
is_running(p)]
146 for process
in ended_processes:
150 if process
in process_bars:
151 del process_bars[process]
153 for process
in running_processes:
155 if not process.is_valid:
165 elif process.progress_queue_local.poll():
166 result = process.progress_queue_local.recv()
167 if result !=
"end" and display_bar:
168 process_bar = process_bars[process]
169 process_bar.update(result)
172 process.result_queue.fill_results()
173 process.join(timeout=0.01)
182 if send_notification:
186 notify2.init(
"basf2")
187 n = notify2.Notification(
"basf2",
188 "Calculation finished",
189 "notification-message-im"
194 raise ImportError(
"notify2 library must be installed to show Desktop notifications.")
198 Shows the end result (finished or failed)
for all processes
in the process_bars list
200 if process
in process_bars:
201 process_bar = process_bars[process]
203 process_bar.update(
"failed!")
205 process_bar.update(
"finished")
209 Calculate a function on all processes and collect the results
if index
is None.
210 Else calculate the function only one the given process
or the process number.
218 if isinstance(index, int):
221 return map_function(index)
225 Test if the process
is still running
231 Test if the process has finished
237 Test if the process has failed.
241 return process.exitcode != 0
243 raise AssertionError(
"Calculation has not finished.")
247 def get(self, name, index=None):
249 Return the saved queue item with the given name
251 def f(process, name):
253 return process.get(name)
260 Return the names of the items that were saved in the queue
266 Return the log of the process if finished
270 return process.get_log()
272 raise AssertionError(
"Calculation has not been started.")
278 Return a string describing the current status if the calculation
281 if not process.already_run:
294 Return the parameters used to start this calculation
300 Create a overview widget for all processes
or only one
for the given process.
309 if isinstance(index, int):
312 widget = widget_function(index)
318 Show the log of the underlying process(es).
325 raise AssertionError(
"Calculation has not been started.")
331 Show the statistics in a smart manner
335 if "ipython.statistics" in self.
get_keys(process):
344 Show some snapshots on the collections.
345 Remember to add the PrintCollectionsPython Module for that!
349 if "ipython.store_content" in self.
get_keys(process):
358 Return the statistics of the process if finished
362 return self.
get(
"ipython.statistics", process)
364 raise AssertionError(
"Calculation has not finished.")
368 def append(self, result_queue, log_file_name, parameters, **kwargs):
370 Construct a new process with the given parameters
and add it to the process_list.
373 parameters=parameters, **kwargs))