Belle II Software development
test_histogram.py
1
8
9import os
10from unittest import main
11import basf2
12
13
14from zmq_daq.test_support import HLTZMQTestCase, ZMQ_TEST_FOR_LOOPS, ZMQ_TEST_MAX_FAILURES
15
16
17def check_histogram_output(file_name, expected_factor):
18 """Open the given file name and check if the contained histogram has exactly expected_factor * 2 entries"""
19 result = False
20
21 import ROOT
22 root_file = ROOT.TFile(file_name, "READ")
23 histogram = root_file.Get("my_histogram")
24
25 if expected_factor == 0:
26 if not histogram:
27 result = True
28 else:
29 # every histogram has 2 entries from the start with, so we multiply with 2
30 try:
31 if histogram.GetEntries() == 2 * expected_factor:
32 result = True
33 except AttributeError:
34 pass
35
36 root_file.Close()
37 os.unlink(file_name)
38
39 return result
40
41
43 """Test case"""
44
45 histogram_data = open(basf2.find_file("daq/hbasf2/tests/histos.raw"), "br").read()
46
47 event_data = b"""{
48 "_typename" : "Belle2::EventMetaData",
49 "fUniqueID" : 0,
50 "fBits" : 33554432,
51 "m_event" : 1,
52 "m_run" : 1,
53 "m_subrun" : 0,
54 "m_experiment" : 1,
55 "m_production" : 0,
56 "m_time" : 0,
57 "m_parentLfn" : "",
58 "m_generatedWeight" : 1,
59 "m_errorFlag" : 0
60 }"""
61
62 def setUp(self):
63 """Setup port numbers and necessary programs"""
64
65 self.first_input_port = HLTZMQTestCase.get_free_port()
66
67 self.first_monitoring_port = HLTZMQTestCase.get_free_port()
68
69
70 self.second_input_port = HLTZMQTestCase.get_free_port()
71
72 self.second_monitoring_port = HLTZMQTestCase.get_free_port()
73
74
75 self.final_collector_input_port = HLTZMQTestCase.get_free_port()
76
77 self.final_collector_monitoring_port = HLTZMQTestCase.get_free_port()
78
80 "first": [
81 "b2hlt_proxyhistoserver", "--input", f"tcp://*:{self.first_input_port}",
82 "--output", f"tcp://localhost:{self.final_collector_input_port}",
83 "--timeout", "1",
84 "--monitor", f"tcp://*:{self.first_monitoring_port}"
85 ],
86 "second": [
87 "b2hlt_proxyhistoserver",
88 "--input", f"tcp://*:{self.second_input_port}",
89 "--output", f"tcp://localhost:{self.final_collector_input_port}",
90 "--timeout", "1",
91 "--monitor", f"tcp://*:{self.second_monitoring_port}"
92 ],
93 "final_collector": [
94 "b2hlt_finalhistoserver",
95 "--input", f"tcp://*:{self.final_collector_input_port}",
96 "--rootFileName", "outputFile.root",
97 "--timeout", "1",
98 "--monitor", f"tcp://*:{self.final_collector_monitoring_port}"
99 ],
100 }
101 super().setUp()
102
104 """test function"""
105 self.assertNotHasOutputFile("outputFile.root")
106
107 first_monitoring_socket = self.create_socket(self.first_monitoring_port)
108 second_monitoring_socket = self.create_socket(self.second_monitoring_port)
109 final_monitoring_socket = self.create_socket(self.final_collector_monitoring_port)
110
111 input_sockets = [
112 self.create_socket(self.first_input_port, identity="1"),
113 self.create_socket(self.first_input_port, identity="2"),
114 self.create_socket(self.second_input_port, identity="3"),
115 self.create_socket(self.second_input_port, identity="4"),
116 self.create_socket(self.second_input_port, identity="5"),
117 ]
118
119 for input_socket in input_sockets:
120 self.send(input_socket, "h")
121 self.assertIsMsgType(input_socket, "c")
122
123 # At the beginning, everything should be at normal state
124 self.assertMonitoring(first_monitoring_socket, "input.registered_workers", 2)
125 self.assertMonitoring(second_monitoring_socket, "input.registered_workers", 3)
126 self.assertMonitoring(final_monitoring_socket, "input.registered_workers", 2)
127
128 # So far no stop messages should be there
129 self.assertMonitoring(first_monitoring_socket, "input.received_stop_messages", 0)
130 self.assertMonitoring(second_monitoring_socket, "input.received_stop_messages", 0)
131 self.assertMonitoring(final_monitoring_socket, "input.received_stop_messages", 0)
132
133 self.assertMonitoring(first_monitoring_socket, "input.all_stop_messages", False)
134 self.assertMonitoring(second_monitoring_socket, "input.all_stop_messages", False)
135 self.assertMonitoring(final_monitoring_socket, "input.all_stop_messages", False)
136
137 # send some events, which should eventually trigger a merge
138 for _ in range(10):
139 for input_socket in input_sockets:
140 self.send(input_socket, "v", self.histogram_data, self.event_data)
141 self.assertIsMsgType(input_socket, "c")
142
143 self.assertMonitoring(first_monitoring_socket, "input.received_events", 20)
144 self.assertMonitoring(second_monitoring_socket, "input.received_events", 30)
145 self.assertMonitoring(final_monitoring_socket, "input.received_events", 2)
146
147 # Make sure to get the newest data
148 self.check_histogram_repeated("outputFile.root", 5)
149
150 # send out stop messages, which should also trigger a merge
151 for input_socket in input_sockets:
152 self.send(input_socket, "l")
153 self.assertIsMsgType(input_socket, "c")
154
155 self.assertMonitoring(first_monitoring_socket, "input.received_stop_messages", 2)
156 self.assertMonitoring(first_monitoring_socket, "input.all_stop_messages", True)
157 self.assertMonitoring(second_monitoring_socket, "input.received_stop_messages", 3)
158 self.assertMonitoring(second_monitoring_socket, "input.all_stop_messages", True)
159
160 self.assertMonitoring(final_monitoring_socket, "input.received_stop_messages", 2)
161 self.assertMonitoring(final_monitoring_socket, "input.all_stop_messages", True)
162 self.assertHasOutputFile("outputFile.root", unlink=False)
163 self.check_histogram_repeated("outputFile.root", 5)
164
165 # Now we clean up
166 self.send(first_monitoring_socket, "n")
167 self.send(second_monitoring_socket, "n")
168 self.send(final_monitoring_socket, "n")
169
170 # which should give us a clean root file again
171 self.assertNotHasOutputFile("outputFile.root", timeout=1)
172
173 self.assertMonitoring(first_monitoring_socket, "input.received_stop_messages", 0)
174 self.assertMonitoring(first_monitoring_socket, "input.all_stop_messages", False)
175 self.assertMonitoring(second_monitoring_socket, "input.received_stop_messages", 0)
176 self.assertMonitoring(second_monitoring_socket, "input.all_stop_messages", False)
177
178 self.assertMonitoring(final_monitoring_socket, "input.received_stop_messages", 0)
179 self.assertMonitoring(final_monitoring_socket, "input.all_stop_messages", False)
180
181 # and send some more events, but only to the first two clients
182 for _ in range(5):
183 for input_socket in input_sockets[:2]:
184 self.send(input_socket, "v", self.histogram_data, self.event_data)
185 self.assertIsMsgType(input_socket, "c")
186
187 # this time we expect less entries
188 self.check_histogram_repeated("outputFile.root", 2)
189
190 # Now send a terminate message
191 for input_socket in input_sockets:
192 self.send(input_socket, "x")
193 self.assertIsMsgType(input_socket, "c")
194
195 # We expect another merge here
196 self.assertHasOutputFile("outputFile.root", unlink=False)
197 self.check_histogram_repeated("outputFile.root", 2)
198
199 self.assertIsDown("first")
200 self.assertIsDown("second")
201 self.assertIsDown("final_collector")
202
203 def check_histogram_repeated(self, file_name, expected_factor):
204 """Repeatedly call check_histogram_output 5 times until it is actually fulfilled"""
205 tries = 0
206 while tries < 5:
207 self.assertHasOutputFile(file_name, unlink=False, timeout=2)
208 if check_histogram_output(file_name, expected_factor):
209 break
210 tries += 1
211 else:
212 raise AssertionError("Even after retry, the output was not correct!")
213
214
216 """Test case"""
217
218 input_port = HLTZMQTestCase.get_free_port()
219
220 monitoring_port = HLTZMQTestCase.get_free_port()
221
222
223 needed_programs = {"histoserver": ["b2hlt_finalhistoserver", "--input", f"tcp://*:{input_port}",
224 "--rootFileName", "outputFile.root",
225 "--timeout", "0", # we remove the timeout on purpose
226 "--monitor", f"tcp://*:{monitoring_port}"],
227 }
228
229
230 histogram_data = open(basf2.find_file("daq/hbasf2/tests/histos.raw"), "br").read()
231
232 event_data = b"""{
233 "_typename" : "Belle2::EventMetaData",
234 "fUniqueID" : 0,
235 "fBits" : 33554432,
236 "m_event" : 1,
237 "m_run" : 1,
238 "m_subrun" : 0,
239 "m_experiment" : 1,
240 "m_production" : 0,
241 "m_time" : 0,
242 "m_parentLfn" : "",
243 "m_generatedWeight" : 1,
244 "m_errorFlag" : 0
245 }"""
246
248 """test function"""
249 monitoring_socket = self.create_socket(self.monitoring_port)
250
251 input_socket = self.create_socket(self.input_port)
252 self.send(input_socket, "h")
253 self.assertIsMsgType(input_socket, "c")
254
255 second_input_socket = self.create_socket(self.input_port, identity="other_socket")
256 self.send(second_input_socket, "h")
257 self.assertIsMsgType(second_input_socket, "c")
258
259 # At the beginning, everything should be at normal state
260 self.assertMonitoring(monitoring_socket, "input.registered_workers", 2)
261 self.assertNotHasOutputFile("outputFile.root", timeout=1)
262
263 # So far no stop messages should be there
264 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
265 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
266
267 # the first stop message should not trigger a transmission
268 self.send(input_socket, "l")
269 self.assertIsMsgType(input_socket, "c")
270 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 1)
271 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
272 self.assertNotHasOutputFile("outputFile.root", timeout=1)
273
274 # The second stop message should also not, as there are no histograms so far
275 self.send(second_input_socket, "l")
276 self.assertIsMsgType(second_input_socket, "c")
277 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 2)
278 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", True)
279 self.assertNotHasOutputFile("outputFile.root", timeout=1)
280
281 # Reset everything
282 self.send(monitoring_socket, "n")
283 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
284 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
285
286 # Now lets send some events
287 self.send(input_socket, "v", self.histogram_data, self.event_data)
288 self.assertIsMsgType(input_socket, "c")
289
290 self.send(input_socket, "v", self.histogram_data, self.event_data)
291 self.assertIsMsgType(input_socket, "c")
292
293 self.send(second_input_socket, "v", self.histogram_data, self.event_data)
294 self.assertIsMsgType(second_input_socket, "c")
295
296 self.send(input_socket, "v", self.histogram_data, self.event_data)
297 self.assertIsMsgType(input_socket, "c")
298
299 self.send(second_input_socket, "v", self.histogram_data, self.event_data)
300 self.assertIsMsgType(second_input_socket, "c")
301
302 self.send(second_input_socket, "v", self.histogram_data, self.event_data)
303 self.assertIsMsgType(second_input_socket, "c")
304
305 # This should not be enough to trigger a merge
306 self.assertMonitoring(monitoring_socket, "input.received_events", 6)
307 self.assertNotHasOutputFile("outputFile.root", timeout=1)
308
309 # But if we again send the stop messages
310 self.send(input_socket, "l")
311 self.assertIsMsgType(input_socket, "c")
312 self.send(second_input_socket, "l")
313 self.assertIsMsgType(second_input_socket, "c")
314
315 # .. it should have merged it. We expect 2 entries, as we have 2 clients (no matter how often they sent)
316 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 2)
317 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", True)
318 self.assertHasOutputFile("outputFile.root", unlink=False)
319 self.assertTrue(check_histogram_output("outputFile.root", 2))
320
321 # Now send a terminate message
322 self.send(input_socket, "x")
323 self.assertIsMsgType(input_socket, "c")
324 self.send(second_input_socket, "x")
325 self.assertIsMsgType(second_input_socket, "c")
326
327 # There should be no merge happening, as the files are already written
328 self.assertNotHasOutputFile("outputFile.root")
329
330 self.assertIsDown("histoserver")
331
332
333if __name__ == '__main__':
334
335 number_of_failures = 0
336
337 for i in range(ZMQ_TEST_FOR_LOOPS):
338 try:
339 main(exit=False)
340 except AssertionError:
341 number_of_failures += 1
342
343
344 message = f'Number of failed for loops: {number_of_failures}/{ZMQ_TEST_FOR_LOOPS}'
345 if number_of_failures <= ZMQ_TEST_MAX_FAILURES:
346 basf2.B2INFO(message)
347 else:
348 basf2.B2FATAL(message)
HLTZMQTestCase monitoring_port
monitoring_port
HLTZMQTestCase input_port
input_port
final_collector_input_port
final_collector_input_port
first_monitoring_port
first_monitoring_port
open histogram_data
histogram_data
final_collector_monitoring_port
final_collector_monitoring_port
def check_histogram_repeated(self, file_name, expected_factor)
second_monitoring_port
second_monitoring_port
def assertIsMsgType(self, socket, message_type, final=True, router=False)
dict needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
Definition: test_support.py:38
def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False)
def assertMonitoring(self, socket, search_key, search_value, timeout=10)
def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
def assertNotHasOutputFile(self, output_file, timeout=0.5)
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)
Definition: test_support.py:93
Definition: main.py:1