3 from pathlib
import Path
4 from unittest
import main
12 def check_histogram_output(file_name, expected_factor):
13 """Open the given file name and check if the contained histogram has exactly expected_factor * 2 entries"""
17 root_file = ROOT.TFile(file_name,
"READ")
18 histogram = root_file.Get(
"my_histogram")
20 if expected_factor == 0:
26 if histogram.GetEntries() == 2 * expected_factor:
28 except AttributeError:
40 histogram_data = open(basf2.find_file(
"daq/hbasf2/tests/histos.raw"),
"br").read()
43 "_typename" : "Belle2::EventMetaData",
53 "m_generatedWeight" : 1,
58 """Setup port numbers and necessary programs"""
76 "b2hlt_proxyhistoserver",
"--input", f
"tcp://*:{self.first_input_port}",
77 "--output", f
"tcp://localhost:{self.final_collector_input_port}",
79 "--monitor", f
"tcp://*:{self.first_monitoring_port}"
82 "b2hlt_proxyhistoserver",
83 "--input", f
"tcp://*:{self.second_input_port}",
84 "--output", f
"tcp://localhost:{self.final_collector_input_port}",
86 "--monitor", f
"tcp://*:{self.second_monitoring_port}"
89 "b2hlt_finalhistoserver",
90 "--input", f
"tcp://*:{self.final_collector_input_port}",
91 "--rootFileName",
"outputFile.root",
93 "--monitor", f
"tcp://*:{self.final_collector_monitoring_port}"
114 for input_socket
in input_sockets:
115 self.
send(input_socket,
"h")
119 self.
assertMonitoring(first_monitoring_socket,
"input.registered_workers", 2)
120 self.
assertMonitoring(second_monitoring_socket,
"input.registered_workers", 3)
121 self.
assertMonitoring(final_monitoring_socket,
"input.registered_workers", 2)
124 self.
assertMonitoring(first_monitoring_socket,
"input.received_stop_messages", 0)
125 self.
assertMonitoring(second_monitoring_socket,
"input.received_stop_messages", 0)
126 self.
assertMonitoring(final_monitoring_socket,
"input.received_stop_messages", 0)
128 self.
assertMonitoring(first_monitoring_socket,
"input.all_stop_messages",
False)
129 self.
assertMonitoring(second_monitoring_socket,
"input.all_stop_messages",
False)
130 self.
assertMonitoring(final_monitoring_socket,
"input.all_stop_messages",
False)
134 for input_socket
in input_sockets:
139 self.
assertMonitoring(second_monitoring_socket,
"input.received_events", 30)
146 for input_socket
in input_sockets:
147 self.
send(input_socket,
"l")
150 self.
assertMonitoring(first_monitoring_socket,
"input.received_stop_messages", 2)
151 self.
assertMonitoring(first_monitoring_socket,
"input.all_stop_messages",
True)
152 self.
assertMonitoring(second_monitoring_socket,
"input.received_stop_messages", 3)
153 self.
assertMonitoring(second_monitoring_socket,
"input.all_stop_messages",
True)
155 self.
assertMonitoring(final_monitoring_socket,
"input.received_stop_messages", 2)
156 self.
assertMonitoring(final_monitoring_socket,
"input.all_stop_messages",
True)
161 self.
send(first_monitoring_socket,
"n")
162 self.
send(second_monitoring_socket,
"n")
163 self.
send(final_monitoring_socket,
"n")
168 self.
assertMonitoring(first_monitoring_socket,
"input.received_stop_messages", 0)
169 self.
assertMonitoring(first_monitoring_socket,
"input.all_stop_messages",
False)
170 self.
assertMonitoring(second_monitoring_socket,
"input.received_stop_messages", 0)
171 self.
assertMonitoring(second_monitoring_socket,
"input.all_stop_messages",
False)
173 self.
assertMonitoring(final_monitoring_socket,
"input.received_stop_messages", 0)
174 self.
assertMonitoring(final_monitoring_socket,
"input.all_stop_messages",
False)
178 for input_socket
in input_sockets[:2]:
186 for input_socket
in input_sockets:
187 self.
send(input_socket,
"x")
199 """Repeatedly call check_histogram_output 5 times until it is actually fulfilled"""
203 if check_histogram_output(file_name, expected_factor):
207 raise AssertionError(
"Even after retry, the output was not correct!")
213 input_port = HLTZMQTestCase.get_free_port()
215 monitoring_port = HLTZMQTestCase.get_free_port()
218 needed_programs = {
"histoserver": [
"b2hlt_finalhistoserver",
"--input", f
"tcp://*:{input_port}",
219 "--rootFileName",
"outputFile.root",
221 "--monitor", f
"tcp://*:{monitoring_port}"],
225 histogram_data = open(basf2.find_file(
"daq/hbasf2/tests/histos.raw"),
"br").read()
228 "_typename" : "Belle2::EventMetaData",
238 "m_generatedWeight" : 1,
247 self.
send(input_socket,
"h")
251 self.
send(second_input_socket,
"h")
263 self.
send(input_socket,
"l")
270 self.
send(second_input_socket,
"l")
277 self.
send(monitoring_socket,
"n")
305 self.
send(input_socket,
"l")
307 self.
send(second_input_socket,
"l")
314 self.assertTrue(check_histogram_output(
"outputFile.root", 2))
317 self.
send(input_socket,
"x")
319 self.
send(second_input_socket,
"x")
328 if __name__ ==
'__main__':