Belle II Software  release-08-01-10
parallel_processing.py
1 #!/usr/bin/env python3
2 
3 
10 
11 # Test the parallel processing functionality by simulating a few events
12 # using 2 processes (so nothing too taxing)
13 
14 import basf2
15 from ROOT import TFile, Belle2
16 from b2test_utils import skip_test_if_light, clean_working_directory
17 skip_test_if_light() # cannot simulate events in a light release
18 
19 
20 class CheckEventNumbers(basf2.Module):
21  """Class to check that we see all events we expect exactly once and nothing else"""
22 
23  def __init__(self, evtNumList):
24  """Remember number of events to process"""
25  super().__init__()
26 
27  self.__evtNumList__evtNumList = evtNumList
28 
29  self.__evtNumbers__evtNumbers = []
30 
31  def event(self):
32  """Accumulate all event numbers we see"""
33  evtNr = Belle2.PyStoreObj("EventMetaData").obj().getEvent()
34  self.__evtNumbers__evtNumbers.append(evtNr)
35 
36  def terminate(self):
37  """Check if event numbers are as they should be"""
38  seen = self.__evtNumbers__evtNumbers
39  should = list(range(1, self.__evtNumList__evtNumList + 1))
40  all_numbers = sorted(set(seen) | set(should))
41  all_ok = True
42  for evtNr in all_numbers:
43  c = seen.count(evtNr)
44  if not (evtNr in should and c == 1):
45  basf2.B2ERROR("event number %d seen %d times" % (evtNr, c))
46  all_ok = False
47  if not all_ok:
48  basf2.B2FATAL("Missing/extra events")
49 
50 
51 main = basf2.Path()
52 # init path
53 main.add_module("EventInfoSetter", evtNumList=[5])
54 particlegun = main.add_module("ParticleGun", pdgCodes=[211, -211, 321, -321],
55  momentumGeneration="fixed", momentumParams=[3])
56 
57 # event path
58 main.add_module("Gearbox")
59 main.add_module("Geometry", components=['MagneticField', 'BeamPipe', 'PXD'], logLevel=basf2.LogLevel.ERROR)
60 simulation = main.add_module("FullSim", logLevel=basf2.LogLevel.ERROR)
61 
62 # output path
63 main.add_module("RootOutput", outputFileName='parallel_processing_test.root')
64 main.add_module("Progress")
65 main.add_module(CheckEventNumbers(5))
66 
67 # test wether flags are what we expect
68 if particlegun.has_properties(basf2.ModulePropFlags.PARALLELPROCESSINGCERTIFIED):
69  basf2.B2FATAL("ParticleGun has pp flag?")
70 if not simulation.has_properties(basf2.ModulePropFlags.PARALLELPROCESSINGCERTIFIED):
71  basf2.B2FATAL("Simulation doesn't have pp flag?")
72 
73 # Process events in one more process than we have events to make sure at least
74 # one of them doesn't get an event
75 basf2.set_nprocesses(5)
76 with clean_working_directory():
77  basf2.process(main)
78 
79  print(basf2.statistics)
80  print(basf2.statistics(basf2.statistics.TOTAL))
81  assert basf2.statistics.get(simulation).calls(basf2.statistics.EVENT) == 5
82  # +1 because of extra call to master module
83  assert basf2.statistics.get_global().calls(basf2.statistics.EVENT) == 6
84 
85  # check wether output file contains correct number of events
86  file = TFile('parallel_processing_test.root')
87  tree = file.Get('tree')
88  if tree.GetEntries() != 5:
89  basf2.B2FATAL('Created output file contains wrong number of events! (' + str(tree.GetEntries()) + ')')
90 
91  nummcparticles = tree.Project("", "MCParticles.m_pdg")
92  if nummcparticles < 5:
93  basf2.B2FATAL('Output file should contain at least five MCParticles!')
94 
95  numhits = tree.Project("", "PXDSimHits.getArrayIndex()")
96  if numhits < 5: # usually much more, existence is most important thing here
97  basf2.B2FATAL('Output file should contain at least 5 hits!')
a (simplified) python wrapper for StoreObjPtr.
Definition: PyStoreObj.h:67
__evtNumList
the number of events so we expect the event numbers 0..(evtNumList-1)
__evtNumbers
event numbers we actually saw