Belle II Software  release-08-01-10
test_histogram.py
1 
8 
9 import os
10 from unittest import main
11 import basf2
12 
13 
14 from zmq_daq.test_support import HLTZMQTestCase, ZMQ_TEST_FOR_LOOPS, ZMQ_TEST_MAX_FAILURES
15 
16 
17 def check_histogram_output(file_name, expected_factor):
18  """Open the given file name and check if the contained histogram has exactly expected_factor * 2 entries"""
19  result = False
20 
21  import ROOT
22  root_file = ROOT.TFile(file_name, "READ")
23  histogram = root_file.Get("my_histogram")
24 
25  if expected_factor == 0:
26  if not histogram:
27  result = True
28  else:
29  # every histogram has 2 entries from the start with, so we multiply with 2
30  try:
31  if histogram.GetEntries() == 2 * expected_factor:
32  result = True
33  except AttributeError:
34  pass
35 
36  root_file.Close()
37  os.unlink(file_name)
38 
39  return result
40 
41 
43  """Test case"""
44 
45  histogram_data = open(basf2.find_file("daq/hbasf2/tests/histos.raw"), "br").read()
46 
47  event_data = b"""{
48  "_typename" : "Belle2::EventMetaData",
49  "fUniqueID" : 0,
50  "fBits" : 33554432,
51  "m_event" : 1,
52  "m_run" : 1,
53  "m_subrun" : 0,
54  "m_experiment" : 1,
55  "m_production" : 0,
56  "m_time" : 0,
57  "m_parentLfn" : "",
58  "m_generatedWeight" : 1,
59  "m_errorFlag" : 0
60  }"""
61 
62  def setUp(self):
63  """Setup port numbers and necessary programs"""
64 
65  self.first_input_portfirst_input_port = HLTZMQTestCase.get_free_port()
66 
67  self.first_monitoring_portfirst_monitoring_port = HLTZMQTestCase.get_free_port()
68 
69 
70  self.second_input_portsecond_input_port = HLTZMQTestCase.get_free_port()
71 
72  self.second_monitoring_portsecond_monitoring_port = HLTZMQTestCase.get_free_port()
73 
74 
75  self.final_collector_input_portfinal_collector_input_port = HLTZMQTestCase.get_free_port()
76 
77  self.final_collector_monitoring_portfinal_collector_monitoring_port = HLTZMQTestCase.get_free_port()
78 
79  self.needed_programsneeded_programsneeded_programs = {
80  "first": [
81  "b2hlt_proxyhistoserver", "--input", f"tcp://*:{self.first_input_port}",
82  "--output", f"tcp://localhost:{self.final_collector_input_port}",
83  "--timeout", "1",
84  "--monitor", f"tcp://*:{self.first_monitoring_port}"
85  ],
86  "second": [
87  "b2hlt_proxyhistoserver",
88  "--input", f"tcp://*:{self.second_input_port}",
89  "--output", f"tcp://localhost:{self.final_collector_input_port}",
90  "--timeout", "1",
91  "--monitor", f"tcp://*:{self.second_monitoring_port}"
92  ],
93  "final_collector": [
94  "b2hlt_finalhistoserver",
95  "--input", f"tcp://*:{self.final_collector_input_port}",
96  "--rootFileName", "outputFile.root",
97  "--timeout", "1",
98  "--monitor", f"tcp://*:{self.final_collector_monitoring_port}"
99  ],
100  }
101  super().setUp()
102 
104  """test function"""
105  self.assertNotHasOutputFileassertNotHasOutputFile("outputFile.root")
106 
107  first_monitoring_socket = self.create_socketcreate_socket(self.first_monitoring_portfirst_monitoring_port)
108  second_monitoring_socket = self.create_socketcreate_socket(self.second_monitoring_portsecond_monitoring_port)
109  final_monitoring_socket = self.create_socketcreate_socket(self.final_collector_monitoring_portfinal_collector_monitoring_port)
110 
111  input_sockets = [
112  self.create_socketcreate_socket(self.first_input_portfirst_input_port, identity="1"),
113  self.create_socketcreate_socket(self.first_input_portfirst_input_port, identity="2"),
114  self.create_socketcreate_socket(self.second_input_portsecond_input_port, identity="3"),
115  self.create_socketcreate_socket(self.second_input_portsecond_input_port, identity="4"),
116  self.create_socketcreate_socket(self.second_input_portsecond_input_port, identity="5"),
117  ]
118 
119  for input_socket in input_sockets:
120  self.sendsend(input_socket, "h")
121  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
122 
123  # At the beginning, everything should be at normal state
124  self.assertMonitoringassertMonitoring(first_monitoring_socket, "input.registered_workers", 2)
125  self.assertMonitoringassertMonitoring(second_monitoring_socket, "input.registered_workers", 3)
126  self.assertMonitoringassertMonitoring(final_monitoring_socket, "input.registered_workers", 2)
127 
128  # So far no stop messages should be there
129  self.assertMonitoringassertMonitoring(first_monitoring_socket, "input.received_stop_messages", 0)
130  self.assertMonitoringassertMonitoring(second_monitoring_socket, "input.received_stop_messages", 0)
131  self.assertMonitoringassertMonitoring(final_monitoring_socket, "input.received_stop_messages", 0)
132 
133  self.assertMonitoringassertMonitoring(first_monitoring_socket, "input.all_stop_messages", False)
134  self.assertMonitoringassertMonitoring(second_monitoring_socket, "input.all_stop_messages", False)
135  self.assertMonitoringassertMonitoring(final_monitoring_socket, "input.all_stop_messages", False)
136 
137  # send some events, which should eventually trigger a merge
138  for _ in range(10):
139  for input_socket in input_sockets:
140  self.sendsend(input_socket, "v", self.histogram_datahistogram_data, self.event_dataevent_data)
141  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
142 
143  self.assertMonitoringassertMonitoring(first_monitoring_socket, "input.received_events", 20)
144  self.assertMonitoringassertMonitoring(second_monitoring_socket, "input.received_events", 30)
145  self.assertMonitoringassertMonitoring(final_monitoring_socket, "input.received_events", 2)
146 
147  # Make sure to get the newest data
148  self.check_histogram_repeatedcheck_histogram_repeated("outputFile.root", 5)
149 
150  # send out stop messages, which should also trigger a merge
151  for input_socket in input_sockets:
152  self.sendsend(input_socket, "l")
153  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
154 
155  self.assertMonitoringassertMonitoring(first_monitoring_socket, "input.received_stop_messages", 2)
156  self.assertMonitoringassertMonitoring(first_monitoring_socket, "input.all_stop_messages", True)
157  self.assertMonitoringassertMonitoring(second_monitoring_socket, "input.received_stop_messages", 3)
158  self.assertMonitoringassertMonitoring(second_monitoring_socket, "input.all_stop_messages", True)
159 
160  self.assertMonitoringassertMonitoring(final_monitoring_socket, "input.received_stop_messages", 2)
161  self.assertMonitoringassertMonitoring(final_monitoring_socket, "input.all_stop_messages", True)
162  self.assertHasOutputFileassertHasOutputFile("outputFile.root", unlink=False)
163  self.check_histogram_repeatedcheck_histogram_repeated("outputFile.root", 5)
164 
165  # Now we clean up
166  self.sendsend(first_monitoring_socket, "n")
167  self.sendsend(second_monitoring_socket, "n")
168  self.sendsend(final_monitoring_socket, "n")
169 
170  # which should give us a clean root file again
171  self.assertNotHasOutputFileassertNotHasOutputFile("outputFile.root", timeout=1)
172 
173  self.assertMonitoringassertMonitoring(first_monitoring_socket, "input.received_stop_messages", 0)
174  self.assertMonitoringassertMonitoring(first_monitoring_socket, "input.all_stop_messages", False)
175  self.assertMonitoringassertMonitoring(second_monitoring_socket, "input.received_stop_messages", 0)
176  self.assertMonitoringassertMonitoring(second_monitoring_socket, "input.all_stop_messages", False)
177 
178  self.assertMonitoringassertMonitoring(final_monitoring_socket, "input.received_stop_messages", 0)
179  self.assertMonitoringassertMonitoring(final_monitoring_socket, "input.all_stop_messages", False)
180 
181  # and send some more events, but only to the first two clients
182  for _ in range(5):
183  for input_socket in input_sockets[:2]:
184  self.sendsend(input_socket, "v", self.histogram_datahistogram_data, self.event_dataevent_data)
185  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
186 
187  # this time we expect less entries
188  self.check_histogram_repeatedcheck_histogram_repeated("outputFile.root", 2)
189 
190  # Now send a terminate message
191  for input_socket in input_sockets:
192  self.sendsend(input_socket, "x")
193  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
194 
195  # We expect another merge here
196  self.assertHasOutputFileassertHasOutputFile("outputFile.root", unlink=False)
197  self.check_histogram_repeatedcheck_histogram_repeated("outputFile.root", 2)
198 
199  self.assertIsDownassertIsDown("first")
200  self.assertIsDownassertIsDown("second")
201  self.assertIsDownassertIsDown("final_collector")
202 
203  def check_histogram_repeated(self, file_name, expected_factor):
204  """Repeatedly call check_histogram_output 5 times until it is actually fulfilled"""
205  tries = 0
206  while tries < 5:
207  self.assertHasOutputFileassertHasOutputFile(file_name, unlink=False, timeout=2)
208  if check_histogram_output(file_name, expected_factor):
209  break
210  tries += 1
211  else:
212  raise AssertionError("Even after retry, the output was not correct!")
213 
214 
216  """Test case"""
217 
218  input_port = HLTZMQTestCase.get_free_port()
219 
220  monitoring_port = HLTZMQTestCase.get_free_port()
221 
222 
223  needed_programs = {"histoserver": ["b2hlt_finalhistoserver", "--input", f"tcp://*:{input_port}",
224  "--rootFileName", "outputFile.root",
225  "--timeout", "0", # we remove the timeout on purpose
226  "--monitor", f"tcp://*:{monitoring_port}"],
227  }
228 
229 
230  histogram_data = open(basf2.find_file("daq/hbasf2/tests/histos.raw"), "br").read()
231 
232  event_data = b"""{
233  "_typename" : "Belle2::EventMetaData",
234  "fUniqueID" : 0,
235  "fBits" : 33554432,
236  "m_event" : 1,
237  "m_run" : 1,
238  "m_subrun" : 0,
239  "m_experiment" : 1,
240  "m_production" : 0,
241  "m_time" : 0,
242  "m_parentLfn" : "",
243  "m_generatedWeight" : 1,
244  "m_errorFlag" : 0
245  }"""
246 
248  """test function"""
249  monitoring_socket = self.create_socketcreate_socket(self.monitoring_portmonitoring_port)
250 
251  input_socket = self.create_socketcreate_socket(self.input_portinput_port)
252  self.sendsend(input_socket, "h")
253  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
254 
255  second_input_socket = self.create_socketcreate_socket(self.input_portinput_port, identity="other_socket")
256  self.sendsend(second_input_socket, "h")
257  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
258 
259  # At the beginning, everything should be at normal state
260  self.assertMonitoringassertMonitoring(monitoring_socket, "input.registered_workers", 2)
261  self.assertNotHasOutputFileassertNotHasOutputFile("outputFile.root", timeout=1)
262 
263  # So far no stop messages should be there
264  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
265  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_stop_messages", False)
266 
267  # the first stop message should not trigger a transmission
268  self.sendsend(input_socket, "l")
269  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
270  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_stop_messages", 1)
271  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_stop_messages", False)
272  self.assertNotHasOutputFileassertNotHasOutputFile("outputFile.root", timeout=1)
273 
274  # The second stop message should also not, as there are no histograms so far
275  self.sendsend(second_input_socket, "l")
276  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
277  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_stop_messages", 2)
278  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_stop_messages", True)
279  self.assertNotHasOutputFileassertNotHasOutputFile("outputFile.root", timeout=1)
280 
281  # Reset everything
282  self.sendsend(monitoring_socket, "n")
283  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
284  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_stop_messages", False)
285 
286  # Now lets send some events
287  self.sendsend(input_socket, "v", self.histogram_datahistogram_data, self.event_dataevent_data)
288  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
289 
290  self.sendsend(input_socket, "v", self.histogram_datahistogram_data, self.event_dataevent_data)
291  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
292 
293  self.sendsend(second_input_socket, "v", self.histogram_datahistogram_data, self.event_dataevent_data)
294  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
295 
296  self.sendsend(input_socket, "v", self.histogram_datahistogram_data, self.event_dataevent_data)
297  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
298 
299  self.sendsend(second_input_socket, "v", self.histogram_datahistogram_data, self.event_dataevent_data)
300  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
301 
302  self.sendsend(second_input_socket, "v", self.histogram_datahistogram_data, self.event_dataevent_data)
303  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
304 
305  # This should not be enough to trigger a merge
306  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_events", 6)
307  self.assertNotHasOutputFileassertNotHasOutputFile("outputFile.root", timeout=1)
308 
309  # But if we again send the stop messages
310  self.sendsend(input_socket, "l")
311  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
312  self.sendsend(second_input_socket, "l")
313  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
314 
315  # .. it should have merged it. We expect 2 entries, as we have 2 clients (no matter how often they sent)
316  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_stop_messages", 2)
317  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_stop_messages", True)
318  self.assertHasOutputFileassertHasOutputFile("outputFile.root", unlink=False)
319  self.assertTrue(check_histogram_output("outputFile.root", 2))
320 
321  # Now send a terminate message
322  self.sendsend(input_socket, "x")
323  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
324  self.sendsend(second_input_socket, "x")
325  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
326 
327  # There should be no merge happening, as the files are already written
328  self.assertNotHasOutputFileassertNotHasOutputFile("outputFile.root")
329 
330  self.assertIsDownassertIsDown("histoserver")
331 
332 
333 if __name__ == '__main__':
334 
335  number_of_failures = 0
336 
337  for i in range(ZMQ_TEST_FOR_LOOPS):
338  try:
339  main(exit=False)
340  except AssertionError:
341  number_of_failures += 1
342 
343 
344  message = f'Number of failed for loops: {number_of_failures}/{ZMQ_TEST_FOR_LOOPS}'
345  if number_of_failures <= ZMQ_TEST_MAX_FAILURES:
346  basf2.B2INFO(message)
347  else:
348  basf2.B2FATAL(message)
final_collector_input_port
final_collector_input_port
first_monitoring_port
first_monitoring_port
final_collector_monitoring_port
final_collector_monitoring_port
def check_histogram_repeated(self, file_name, expected_factor)
second_monitoring_port
second_monitoring_port
def assertIsMsgType(self, socket, message_type, final=True, router=False)
def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False)
needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
Definition: test_support.py:38
def assertMonitoring(self, socket, search_key, search_value, timeout=10)
def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
def assertNotHasOutputFile(self, output_file, timeout=0.5)
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)
Definition: test_support.py:93
Definition: main.py:1