13 from hep_ipython_tools 
import viewer
 
   16 import multiprocessing
 
   22     Create a Calculation from the given Process that handles 
   23     the status of the process and the actions like start, stop or wait_for_end 
   24     Do not create instances of this class by yourself but rather use the IPythonHandler for this. 
   28         """ Init with an empty list of processes """ 
   40         Make the class iterable over all single processes 
   46         """ Return the number of handled processes """ 
   49     def stop(self, index=None):
 
   51         Kill the processes. Please keep in mind that killing a process is normaly accompanied with data loss. 
   61         Start part of the processes and wait for all to finish. If max_processes is None, 
   62         only n processes will be started where n is the number of cores on the current machine. 
   63         As soon as one process is finished, a waiting one will be started in order to fill all cores 
   67           max_processes: The number of processes which can be run at the same time 
   70             max_processes = multiprocessing.cpu_count()
 
   73         self.
wait_for_endwait_for_end(max_processes=max_processes)
 
   77         Start the processes in the background. 
   78         Raises an error of the process has been started already. 
   79         You can not restart a process. If you want to do so, create another Calculation instance. 
   82             if not process.already_run:
 
   84                 process.already_run = 
True 
   86                 raise AssertionError(
"Calculation can not be started twice.")
 
   92         Ensure that the max_processes number of processes is running and will start 
   93         processes which are waiting and have not been started yet. 
   98         processes_to_start = max(0, max_processes - len(running_processes))
 
  102             if not process.already_run 
and processes_to_start > 0:
 
  104                 process.already_run = 
True 
  105                 processes_to_start = processes_to_start - 1
 
  107     def wait_for_end(self, display_bar=True, send_notification=False, max_processes=None):
 
  109         Send the calculation into the foreground by halting the notebook as 
  110         long as the process is running.  Shows a progress bar with the number 
  111         of processed events.  Please keep in mind that you can not execute 
  112         cells in the notebook when having called wait_for_end (but before - 
  113         although a calculation is running.) 
  117           display_bar: If true, the display bar is used to show in the notebook 
  118                 that the computation is complete. 
  119           send_notification: If true, the notify2 library will be used to 
  120                 notify the user if the computation is complete. This will only 
  121                 work if the jupyter notebook is hosted on the local desktop 
  123           max_processes: The maximum number of processes which will be run on 
  124                 the machine. This has no effect when start() has been called 
  125                 before.  This parameter can not be used directly, but 
  126                 start_batched_and_wait_for_end() should be used. 
  132                             for process 
in self.
process_listprocess_list 
if process.is_valid}
 
  134         started_processes = [p 
for p 
in self.
process_listprocess_list 
if p.is_valid 
and p.already_run]
 
  135         running_processes = started_processes
 
  137         while len(running_processes) > 0:
 
  142             started_processes = [p 
for p 
in self.
process_listprocess_list 
if p.is_valid 
and p.already_run]
 
  143             running_processes = [p 
for p 
in started_processes 
if self.
is_runningis_running(p)]
 
  144             ended_processes = [p 
for p 
in started_processes 
if not self.
is_runningis_running(p)]
 
  146             for process 
in ended_processes:
 
  150                     if process 
in process_bars:
 
  151                         del process_bars[process]
 
  153             for process 
in running_processes:
 
  155                 if not process.is_valid:
 
  165                 elif process.progress_queue_local.poll():
 
  166                     result = process.progress_queue_local.recv()
 
  167                     if result != 
"end" and display_bar:
 
  168                         process_bar = process_bars[process]
 
  169                         process_bar.update(result)
 
  172                     process.result_queue.fill_results()
 
  173                     process.join(timeout=0.01)
 
  182         if send_notification:
 
  186                 notify2.init(
"basf2")
 
  187                 n = notify2.Notification(
"basf2",  
 
  188                                          "Calculation finished",  
 
  189                                          "notification-message-im"    
  194                 raise ImportError(
"notify2 library must be installed to show Desktop notifications.")
 
  198         Shows the end result (finished or failed) for all processes in the process_bars list 
  200         if process 
in process_bars:
 
  201             process_bar = process_bars[process]
 
  203                 process_bar.update(
"failed!")
 
  205                 process_bar.update(
"finished")
 
  209         Calculate a function on all processes and collect the results if index is None. 
  210         Else calculate the function only one the given process or the process number. 
  216                 return list(map(map_function, self.
process_listprocess_list))
 
  218                 if isinstance(index, int):
 
  219                     return map_function(self.
process_listprocess_list[index])
 
  221                     return map_function(index)
 
  225         Test if the process is still running 
  227         return self.
map_on_processesmap_on_processes(
lambda process: process.is_alive(), index)
 
  231         Test if the process has finished 
  233         return self.
map_on_processesmap_on_processes(
lambda process: process.already_run 
and not self.
is_runningis_running(process), index)
 
  237         Test if the process has failed. 
  241                 return process.exitcode != 0
 
  243                 raise AssertionError(
"Calculation has not finished.")
 
  247     def get(self, name, index=None):
 
  249         Return the saved queue item with the given name 
  251         def f(process, name):
 
  253                 return process.get(name)
 
  256         return self.
map_on_processesmap_on_processes(
lambda process: f(process, name), index)
 
  260         Return the names of the items that were saved in the queue 
  262         return self.
map_on_processesmap_on_processes(
lambda process: process.get_keys(), index)
 
  266         Return the log of the process if finished 
  270                 return process.get_log()
 
  272                 raise AssertionError(
"Calculation has not been started.")
 
  278         Return a string describing the current status if the calculation 
  281             if not process.already_run:
 
  294         Return the parameters used to start this calculation 
  296         return self.
map_on_processesmap_on_processes(
lambda process: process.parameters, index)
 
  300         Create a overview widget for all processes or only one for the given process. 
  304             widget = widget_function(self.
process_listprocess_list[0])
 
  309                 if isinstance(index, int):
 
  310                     widget = widget_function(self.
process_listprocess_list[index])
 
  312                     widget = widget_function(index)
 
  318         Show the log of the underlying process(es). 
  325                 raise AssertionError(
"Calculation has not been started.")
 
  331         Show the statistics in a smart manner 
  335             if "ipython.statistics" in self.
get_keysget_keys(process):
 
  344         Show some snapshots on the collections. 
  345         Remember to add the PrintCollectionsPython Module for that! 
  349             if "ipython.store_content" in self.
get_keysget_keys(process):
 
  358         Return the statistics of the process if finished 
  362                 return self.
getget(
"ipython.statistics", process)
 
  364                 raise AssertionError(
"Calculation has not finished.")
 
  368     def append(self, result_queue, log_file_name, parameters, **kwargs):
 
  370         Construct a new process with the given parameters and add it to the process_list. 
  373                                                                 parameters=parameters, **kwargs))