Belle II Software  release-05-01-25
test_histogram.py
1 import os
2 import time
3 from pathlib import Path
4 from unittest import main
5 import basf2
6 
7 import zmq
8 
9 from zmq_daq.test_support import HLTZMQTestCase
10 
11 
12 def check_histogram_output(file_name, expected_factor):
13  """Open the given file name and check if the contained histogram has exactly expected_factor * 2 entries"""
14  result = False
15 
16  import ROOT
17  root_file = ROOT.TFile(file_name, "READ")
18  histogram = root_file.Get("my_histogram")
19 
20  if expected_factor == 0:
21  if not histogram:
22  result = True
23  else:
24  # every histogram has 2 entries from the start with, so we multiply with 2
25  try:
26  if histogram.GetEntries() == 2 * expected_factor:
27  result = True
28  except AttributeError:
29  pass
30 
31  root_file.Close()
32  os.unlink(file_name)
33 
34  return result
35 
36 
38  """Test case"""
39 
40  histogram_data = open(basf2.find_file("daq/hbasf2/tests/histos.raw"), "br").read()
41 
42  event_data = b"""{
43  "_typename" : "Belle2::EventMetaData",
44  "fUniqueID" : 0,
45  "fBits" : 33554432,
46  "m_event" : 1,
47  "m_run" : 1,
48  "m_subrun" : 0,
49  "m_experiment" : 1,
50  "m_production" : 0,
51  "m_time" : 0,
52  "m_parentLfn" : "",
53  "m_generatedWeight" : 1,
54  "m_errorFlag" : 0
55  }"""
56 
57  def setUp(self):
58  """Setup port numbers and necessary programs"""
59 
60  self.first_input_port = HLTZMQTestCase.get_free_port()
61 
62  self.first_monitoring_port = HLTZMQTestCase.get_free_port()
63 
64 
65  self.second_input_port = HLTZMQTestCase.get_free_port()
66 
67  self.second_monitoring_port = HLTZMQTestCase.get_free_port()
68 
69 
70  self.final_collector_input_port = HLTZMQTestCase.get_free_port()
71 
72  self.final_collector_monitoring_port = HLTZMQTestCase.get_free_port()
73 
74  self.needed_programs = {
75  "first": [
76  "b2hlt_proxyhistoserver", "--input", f"tcp://*:{self.first_input_port}",
77  "--output", f"tcp://localhost:{self.final_collector_input_port}",
78  "--timeout", "1",
79  "--monitor", f"tcp://*:{self.first_monitoring_port}"
80  ],
81  "second": [
82  "b2hlt_proxyhistoserver",
83  "--input", f"tcp://*:{self.second_input_port}",
84  "--output", f"tcp://localhost:{self.final_collector_input_port}",
85  "--timeout", "1",
86  "--monitor", f"tcp://*:{self.second_monitoring_port}"
87  ],
88  "final_collector": [
89  "b2hlt_finalhistoserver",
90  "--input", f"tcp://*:{self.final_collector_input_port}",
91  "--rootFileName", "outputFile.root",
92  "--timeout", "1",
93  "--monitor", f"tcp://*:{self.final_collector_monitoring_port}"
94  ],
95  }
96  super().setUp()
97 
99  """test function"""
100  self.assertNotHasOutputFile("outputFile.root")
101 
102  first_monitoring_socket = self.create_socket(self.first_monitoring_port)
103  second_monitoring_socket = self.create_socket(self.second_monitoring_port)
104  final_monitoring_socket = self.create_socket(self.final_collector_monitoring_port)
105 
106  input_sockets = [
107  self.create_socket(self.first_input_port, identity="1"),
108  self.create_socket(self.first_input_port, identity="2"),
109  self.create_socket(self.second_input_port, identity="3"),
110  self.create_socket(self.second_input_port, identity="4"),
111  self.create_socket(self.second_input_port, identity="5"),
112  ]
113 
114  for input_socket in input_sockets:
115  self.send(input_socket, "h")
116  self.assertIsMsgType(input_socket, "c")
117 
118  # At the beginning, everything should be at normal state
119  self.assertMonitoring(first_monitoring_socket, "input.registered_workers", 2)
120  self.assertMonitoring(second_monitoring_socket, "input.registered_workers", 3)
121  self.assertMonitoring(final_monitoring_socket, "input.registered_workers", 2)
122 
123  # So far no stop messages should be there
124  self.assertMonitoring(first_monitoring_socket, "input.received_stop_messages", 0)
125  self.assertMonitoring(second_monitoring_socket, "input.received_stop_messages", 0)
126  self.assertMonitoring(final_monitoring_socket, "input.received_stop_messages", 0)
127 
128  self.assertMonitoring(first_monitoring_socket, "input.all_stop_messages", False)
129  self.assertMonitoring(second_monitoring_socket, "input.all_stop_messages", False)
130  self.assertMonitoring(final_monitoring_socket, "input.all_stop_messages", False)
131 
132  # send some events, which should eventually trigger a merge
133  for _ in range(10):
134  for input_socket in input_sockets:
135  self.send(input_socket, "v", self.histogram_data, self.event_data)
136  self.assertIsMsgType(input_socket, "c")
137 
138  self.assertMonitoring(first_monitoring_socket, "input.received_events", 20)
139  self.assertMonitoring(second_monitoring_socket, "input.received_events", 30)
140  self.assertMonitoring(final_monitoring_socket, "input.received_events", 2)
141 
142  # Make sure to get the newest data
143  self.check_histogram_repeated("outputFile.root", 5)
144 
145  # send out stop messages, which should also trigger a merge
146  for input_socket in input_sockets:
147  self.send(input_socket, "l")
148  self.assertIsMsgType(input_socket, "c")
149 
150  self.assertMonitoring(first_monitoring_socket, "input.received_stop_messages", 2)
151  self.assertMonitoring(first_monitoring_socket, "input.all_stop_messages", True)
152  self.assertMonitoring(second_monitoring_socket, "input.received_stop_messages", 3)
153  self.assertMonitoring(second_monitoring_socket, "input.all_stop_messages", True)
154 
155  self.assertMonitoring(final_monitoring_socket, "input.received_stop_messages", 2)
156  self.assertMonitoring(final_monitoring_socket, "input.all_stop_messages", True)
157  self.assertHasOutputFile("outputFile.root", unlink=False)
158  self.check_histogram_repeated("outputFile.root", 5)
159 
160  # Now we clean up
161  self.send(first_monitoring_socket, "n")
162  self.send(second_monitoring_socket, "n")
163  self.send(final_monitoring_socket, "n")
164 
165  # which should give us a clean root file again
166  self.assertNotHasOutputFile("outputFile.root", timeout=1)
167 
168  self.assertMonitoring(first_monitoring_socket, "input.received_stop_messages", 0)
169  self.assertMonitoring(first_monitoring_socket, "input.all_stop_messages", False)
170  self.assertMonitoring(second_monitoring_socket, "input.received_stop_messages", 0)
171  self.assertMonitoring(second_monitoring_socket, "input.all_stop_messages", False)
172 
173  self.assertMonitoring(final_monitoring_socket, "input.received_stop_messages", 0)
174  self.assertMonitoring(final_monitoring_socket, "input.all_stop_messages", False)
175 
176  # and send some more events, but only to the first two clients
177  for _ in range(5):
178  for input_socket in input_sockets[:2]:
179  self.send(input_socket, "v", self.histogram_data, self.event_data)
180  self.assertIsMsgType(input_socket, "c")
181 
182  # this time we expect less entries
183  self.check_histogram_repeated("outputFile.root", 2)
184 
185  # Now send a terminate message
186  for input_socket in input_sockets:
187  self.send(input_socket, "x")
188  self.assertIsMsgType(input_socket, "c")
189 
190  # We expect another merge here
191  self.assertHasOutputFile("outputFile.root", unlink=False)
192  self.check_histogram_repeated("outputFile.root", 2)
193 
194  self.assertIsDown("first")
195  self.assertIsDown("second")
196  self.assertIsDown("final_collector")
197 
198  def check_histogram_repeated(self, file_name, expected_factor):
199  """Repeatedly call check_histogram_output 5 times until it is actually fulfilled"""
200  tries = 0
201  while tries < 5:
202  self.assertHasOutputFile(file_name, unlink=False, timeout=2)
203  if check_histogram_output(file_name, expected_factor):
204  break
205  tries += 1
206  else:
207  raise AssertionError("Even after retry, the output was not correct!")
208 
209 
211  """Test case"""
212 
213  input_port = HLTZMQTestCase.get_free_port()
214 
215  monitoring_port = HLTZMQTestCase.get_free_port()
216 
217 
218  needed_programs = {"histoserver": ["b2hlt_finalhistoserver", "--input", f"tcp://*:{input_port}",
219  "--rootFileName", "outputFile.root",
220  "--timeout", "0", # we remove the timeout on purpose
221  "--monitor", f"tcp://*:{monitoring_port}"],
222  }
223 
224 
225  histogram_data = open(basf2.find_file("daq/hbasf2/tests/histos.raw"), "br").read()
226 
227  event_data = b"""{
228  "_typename" : "Belle2::EventMetaData",
229  "fUniqueID" : 0,
230  "fBits" : 33554432,
231  "m_event" : 1,
232  "m_run" : 1,
233  "m_subrun" : 0,
234  "m_experiment" : 1,
235  "m_production" : 0,
236  "m_time" : 0,
237  "m_parentLfn" : "",
238  "m_generatedWeight" : 1,
239  "m_errorFlag" : 0
240  }"""
241 
243  """test function"""
244  monitoring_socket = self.create_socket(self.monitoring_port)
245 
246  input_socket = self.create_socket(self.input_port)
247  self.send(input_socket, "h")
248  self.assertIsMsgType(input_socket, "c")
249 
250  second_input_socket = self.create_socket(self.input_port, identity="other_socket")
251  self.send(second_input_socket, "h")
252  self.assertIsMsgType(second_input_socket, "c")
253 
254  # At the beginning, everything should be at normal state
255  self.assertMonitoring(monitoring_socket, "input.registered_workers", 2)
256  self.assertNotHasOutputFile("outputFile.root", timeout=1)
257 
258  # So far no stop messages should be there
259  self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
260  self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
261 
262  # the first stop message should not trigger a transmission
263  self.send(input_socket, "l")
264  self.assertIsMsgType(input_socket, "c")
265  self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 1)
266  self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
267  self.assertNotHasOutputFile("outputFile.root", timeout=1)
268 
269  # The second stop message should also not, as there are no histograms so far
270  self.send(second_input_socket, "l")
271  self.assertIsMsgType(second_input_socket, "c")
272  self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 2)
273  self.assertMonitoring(monitoring_socket, "input.all_stop_messages", True)
274  self.assertNotHasOutputFile("outputFile.root", timeout=1)
275 
276  # Reset everything
277  self.send(monitoring_socket, "n")
278  self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
279  self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
280 
281  # Now lets send some events
282  self.send(input_socket, "v", self.histogram_data, self.event_data)
283  self.assertIsMsgType(input_socket, "c")
284 
285  self.send(input_socket, "v", self.histogram_data, self.event_data)
286  self.assertIsMsgType(input_socket, "c")
287 
288  self.send(second_input_socket, "v", self.histogram_data, self.event_data)
289  self.assertIsMsgType(second_input_socket, "c")
290 
291  self.send(input_socket, "v", self.histogram_data, self.event_data)
292  self.assertIsMsgType(input_socket, "c")
293 
294  self.send(second_input_socket, "v", self.histogram_data, self.event_data)
295  self.assertIsMsgType(second_input_socket, "c")
296 
297  self.send(second_input_socket, "v", self.histogram_data, self.event_data)
298  self.assertIsMsgType(second_input_socket, "c")
299 
300  # This should not be enough to trigger a merge
301  self.assertMonitoring(monitoring_socket, "input.received_events", 6)
302  self.assertNotHasOutputFile("outputFile.root", timeout=1)
303 
304  # But if we again send the stop messages
305  self.send(input_socket, "l")
306  self.assertIsMsgType(input_socket, "c")
307  self.send(second_input_socket, "l")
308  self.assertIsMsgType(second_input_socket, "c")
309 
310  # .. it should have merged it. We expect 2 entries, as we have 2 clients (no matter how often they sent)
311  self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 2)
312  self.assertMonitoring(monitoring_socket, "input.all_stop_messages", True)
313  self.assertHasOutputFile("outputFile.root", unlink=False)
314  self.assertTrue(check_histogram_output("outputFile.root", 2))
315 
316  # Now send a terminate message
317  self.send(input_socket, "x")
318  self.assertIsMsgType(input_socket, "c")
319  self.send(second_input_socket, "x")
320  self.assertIsMsgType(second_input_socket, "c")
321 
322  # There should be no merge happening, as the files are already written
323  self.assertNotHasOutputFile("outputFile.root")
324 
325  self.assertIsDown("histoserver")
326 
327 
328 if __name__ == '__main__':
329  main()
test_histogram.HistogramStopTestCase.event_data
string event_data
event_data
Definition: test_histogram.py:227
test_histogram.HistogramStopTestCase
Definition: test_histogram.py:210
test_histogram.HistogramTestCase.setUp
def setUp(self)
Definition: test_histogram.py:57
test_histogram.HistogramTestCase.event_data
string event_data
event_data
Definition: test_histogram.py:42
zmq_daq.test_support.HLTZMQTestCase
Definition: test_support.py:17
zmq_daq.test_support.HLTZMQTestCase.needed_programs
needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
Definition: test_support.py:26
zmq_daq.test_support.HLTZMQTestCase.assertIsDown
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)
Definition: test_support.py:81
zmq_daq.test_support.HLTZMQTestCase.assertIsMsgType
def assertIsMsgType(self, socket, message_type, final=True, router=False)
Definition: test_support.py:203
zmq_daq.test_support.HLTZMQTestCase.create_socket
def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False)
Definition: test_support.py:103
test_histogram.HistogramTestCase.second_monitoring_port
second_monitoring_port
second_monitoring_port
Definition: test_histogram.py:67
test_histogram.HistogramTestCase.testEventPropagation
def testEventPropagation(self)
Definition: test_histogram.py:98
test_histogram.HistogramStopTestCase.monitoring_port
monitoring_port
monitoring_port
Definition: test_histogram.py:215
zmq_daq.test_support.HLTZMQTestCase.assertNotHasOutputFile
def assertNotHasOutputFile(self, output_file, timeout=0.5)
Definition: test_support.py:233
test_histogram.HistogramTestCase.first_monitoring_port
first_monitoring_port
first_monitoring_port
Definition: test_histogram.py:62
zmq_daq.test_support.HLTZMQTestCase.assertHasOutputFile
def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
Definition: test_support.py:215
main
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:77
zmq_daq.test_support.HLTZMQTestCase.send
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
Definition: test_support.py:136
test_histogram.HistogramTestCase.histogram_data
histogram_data
histogram_data
Definition: test_histogram.py:40
test_histogram.HistogramTestCase.second_input_port
second_input_port
second_input_port
Definition: test_histogram.py:65
test_histogram.HistogramTestCase.final_collector_monitoring_port
final_collector_monitoring_port
final_collector_monitoring_port
Definition: test_histogram.py:72
test_histogram.HistogramTestCase.final_collector_input_port
final_collector_input_port
final_collector_input_port
Definition: test_histogram.py:70
zmq_daq.test_support
Definition: test_support.py:1
test_histogram.HistogramStopTestCase.input_port
input_port
input_port
Definition: test_histogram.py:213
test_histogram.HistogramStopTestCase.testStopPropagation
def testStopPropagation(self)
Definition: test_histogram.py:242
test_histogram.HistogramTestCase
Definition: test_histogram.py:37
test_histogram.HistogramStopTestCase.histogram_data
histogram_data
histogram_data
Definition: test_histogram.py:225
test_histogram.HistogramTestCase.check_histogram_repeated
def check_histogram_repeated(self, file_name, expected_factor)
Definition: test_histogram.py:198
zmq_daq.test_support.HLTZMQTestCase.assertMonitoring
def assertMonitoring(self, socket, search_key, search_value, timeout=10)
Definition: test_support.py:157
test_histogram.HistogramTestCase.first_input_port
first_input_port
first_input_port
Definition: test_histogram.py:60