Belle II Software  release-06-02-00
parallel_processing.py
1 #!/usr/bin/env python3
2 
3 
10 
11 # Test the parallel processing functionality by simulating a few events
12 # using 2 processes (so nothing too taxing)
13 
14 import os
15 import basf2
16 from ROOT import TFile, Belle2
17 from b2test_utils import skip_test_if_light, clean_working_directory
18 skip_test_if_light() # cannot simulate events in a light release
19 
20 
21 class CheckEventNumbers(basf2.Module):
22  """Class to check that we see all events we expect exactly once and nothing else"""
23 
24  def __init__(self, evtNumList):
25  """Remember number of events to process"""
26  super().__init__()
27 
28  self.__evtNumList__evtNumList = evtNumList
29 
30  self.__evtNumbers__evtNumbers = []
31 
32  def event(self):
33  """Accumulate all event numbers we see"""
34  evtNr = Belle2.PyStoreObj("EventMetaData").obj().getEvent()
35  self.__evtNumbers__evtNumbers.append(evtNr)
36 
37  def terminate(self):
38  """Check if event numbers are as they should be"""
39  seen = self.__evtNumbers__evtNumbers
40  should = list(range(1, self.__evtNumList__evtNumList + 1))
41  all_numbers = sorted(set(seen) | set(should))
42  all_ok = True
43  for evtNr in all_numbers:
44  c = seen.count(evtNr)
45  if not (evtNr in should and c == 1):
46  basf2.B2ERROR("event number %d seen %d times" % (evtNr, c))
47  all_ok = False
48  if not all_ok:
49  basf2.B2FATAL("Missing/extra events")
50 
51 
52 main = basf2.Path()
53 # init path
54 main.add_module("EventInfoSetter", evtNumList=[5])
55 particlegun = main.add_module("ParticleGun", pdgCodes=[211, -211, 321, -321],
56  momentumGeneration="fixed", momentumParams=[3])
57 
58 # event path
59 main.add_module("Gearbox")
60 main.add_module("Geometry", components=['MagneticField', 'BeamPipe', 'PXD'], logLevel=basf2.LogLevel.ERROR)
61 simulation = main.add_module("FullSim", logLevel=basf2.LogLevel.ERROR)
62 
63 # output path
64 main.add_module("RootOutput", outputFileName='parallel_processing_test.root')
65 main.add_module("Progress")
66 main.add_module(CheckEventNumbers(5))
67 
68 # test wether flags are what we expect
69 if particlegun.has_properties(basf2.ModulePropFlags.PARALLELPROCESSINGCERTIFIED):
70  basf2.B2FATAL("ParticleGun has pp flag?")
71 if not simulation.has_properties(basf2.ModulePropFlags.PARALLELPROCESSINGCERTIFIED):
72  basf2.B2FATAL("Simulation doesn't have pp flag?")
73 
74 # Process events in one more process than we have events to make sure at least
75 # one of them doesn't get an event
76 basf2.set_nprocesses(5)
77 with clean_working_directory():
78  basf2.process(main)
79 
80  print(basf2.statistics)
81  print(basf2.statistics(basf2.statistics.TOTAL))
82  assert basf2.statistics.get(simulation).calls(basf2.statistics.EVENT) == 5
83  # +1 because of extra call to master module
84  assert basf2.statistics.get_global().calls(basf2.statistics.EVENT) == 6
85 
86  # check wether output file contains correct number of events
87  file = TFile('parallel_processing_test.root')
88  tree = file.Get('tree')
89  if tree.GetEntries() != 5:
90  basf2.B2FATAL('Created output file contains wrong number of events! (' + str(tree.GetEntries()) + ')')
91 
92  nummcparticles = tree.Project("", "MCParticles.m_pdg")
93  if nummcparticles < 5:
94  basf2.B2FATAL('Output file should contain at least five MCParticles!')
95 
96  numhits = tree.Project("", "PXDSimHits.getArrayIndex()")
97  if numhits < 5: # usually much more, existence is most important thing here
98  basf2.B2FATAL('Output file should contain at least 5 hits!')
a (simplified) python wrapper for StoreObjPtr.
Definition: PyStoreObj.h:67
__evtNumList
the number of events so we expect the event numbers 0..(evtNumList-1)
__evtNumbers
event numbers we actually saw