10 from unittest
import main
17 def check_histogram_output(file_name, expected_factor):
18 """Open the given file name and check if the contained histogram has exactly expected_factor * 2 entries"""
22 root_file = ROOT.TFile(file_name,
"READ")
23 histogram = root_file.Get(
"my_histogram")
25 if expected_factor == 0:
31 if histogram.GetEntries() == 2 * expected_factor:
33 except AttributeError:
45 histogram_data = open(basf2.find_file(
"daq/hbasf2/tests/histos.raw"),
"br").read()
48 "_typename" : "Belle2::EventMetaData",
58 "m_generatedWeight" : 1,
63 """Setup port numbers and necessary programs"""
81 "b2hlt_proxyhistoserver",
"--input", f
"tcp://*:{self.first_input_port}",
82 "--output", f
"tcp://localhost:{self.final_collector_input_port}",
84 "--monitor", f
"tcp://*:{self.first_monitoring_port}"
87 "b2hlt_proxyhistoserver",
88 "--input", f
"tcp://*:{self.second_input_port}",
89 "--output", f
"tcp://localhost:{self.final_collector_input_port}",
91 "--monitor", f
"tcp://*:{self.second_monitoring_port}"
94 "b2hlt_finalhistoserver",
95 "--input", f
"tcp://*:{self.final_collector_input_port}",
96 "--rootFileName",
"outputFile.root",
98 "--monitor", f
"tcp://*:{self.final_collector_monitoring_port}"
119 for input_socket
in input_sockets:
120 self.
sendsend(input_socket,
"h")
124 self.
assertMonitoringassertMonitoring(first_monitoring_socket,
"input.registered_workers", 2)
125 self.
assertMonitoringassertMonitoring(second_monitoring_socket,
"input.registered_workers", 3)
126 self.
assertMonitoringassertMonitoring(final_monitoring_socket,
"input.registered_workers", 2)
129 self.
assertMonitoringassertMonitoring(first_monitoring_socket,
"input.received_stop_messages", 0)
130 self.
assertMonitoringassertMonitoring(second_monitoring_socket,
"input.received_stop_messages", 0)
131 self.
assertMonitoringassertMonitoring(final_monitoring_socket,
"input.received_stop_messages", 0)
133 self.
assertMonitoringassertMonitoring(first_monitoring_socket,
"input.all_stop_messages",
False)
134 self.
assertMonitoringassertMonitoring(second_monitoring_socket,
"input.all_stop_messages",
False)
135 self.
assertMonitoringassertMonitoring(final_monitoring_socket,
"input.all_stop_messages",
False)
139 for input_socket
in input_sockets:
143 self.
assertMonitoringassertMonitoring(first_monitoring_socket,
"input.received_events", 20)
144 self.
assertMonitoringassertMonitoring(second_monitoring_socket,
"input.received_events", 30)
145 self.
assertMonitoringassertMonitoring(final_monitoring_socket,
"input.received_events", 2)
151 for input_socket
in input_sockets:
152 self.
sendsend(input_socket,
"l")
155 self.
assertMonitoringassertMonitoring(first_monitoring_socket,
"input.received_stop_messages", 2)
156 self.
assertMonitoringassertMonitoring(first_monitoring_socket,
"input.all_stop_messages",
True)
157 self.
assertMonitoringassertMonitoring(second_monitoring_socket,
"input.received_stop_messages", 3)
158 self.
assertMonitoringassertMonitoring(second_monitoring_socket,
"input.all_stop_messages",
True)
160 self.
assertMonitoringassertMonitoring(final_monitoring_socket,
"input.received_stop_messages", 2)
161 self.
assertMonitoringassertMonitoring(final_monitoring_socket,
"input.all_stop_messages",
True)
166 self.
sendsend(first_monitoring_socket,
"n")
167 self.
sendsend(second_monitoring_socket,
"n")
168 self.
sendsend(final_monitoring_socket,
"n")
173 self.
assertMonitoringassertMonitoring(first_monitoring_socket,
"input.received_stop_messages", 0)
174 self.
assertMonitoringassertMonitoring(first_monitoring_socket,
"input.all_stop_messages",
False)
175 self.
assertMonitoringassertMonitoring(second_monitoring_socket,
"input.received_stop_messages", 0)
176 self.
assertMonitoringassertMonitoring(second_monitoring_socket,
"input.all_stop_messages",
False)
178 self.
assertMonitoringassertMonitoring(final_monitoring_socket,
"input.received_stop_messages", 0)
179 self.
assertMonitoringassertMonitoring(final_monitoring_socket,
"input.all_stop_messages",
False)
183 for input_socket
in input_sockets[:2]:
191 for input_socket
in input_sockets:
192 self.
sendsend(input_socket,
"x")
204 """Repeatedly call check_histogram_output 5 times until it is actually fulfilled"""
208 if check_histogram_output(file_name, expected_factor):
212 raise AssertionError(
"Even after retry, the output was not correct!")
218 input_port = HLTZMQTestCase.get_free_port()
220 monitoring_port = HLTZMQTestCase.get_free_port()
223 needed_programs = {
"histoserver": [
"b2hlt_finalhistoserver",
"--input", f
"tcp://*:{input_port}",
224 "--rootFileName",
"outputFile.root",
226 "--monitor", f
"tcp://*:{monitoring_port}"],
230 histogram_data = open(basf2.find_file(
"daq/hbasf2/tests/histos.raw"),
"br").read()
233 "_typename" : "Belle2::EventMetaData",
243 "m_generatedWeight" : 1,
252 self.
sendsend(input_socket,
"h")
256 self.
sendsend(second_input_socket,
"h")
260 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.registered_workers", 2)
264 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_stop_messages", 0)
265 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_stop_messages",
False)
268 self.
sendsend(input_socket,
"l")
270 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_stop_messages", 1)
271 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_stop_messages",
False)
275 self.
sendsend(second_input_socket,
"l")
277 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_stop_messages", 2)
278 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_stop_messages",
True)
282 self.
sendsend(monitoring_socket,
"n")
283 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_stop_messages", 0)
284 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_stop_messages",
False)
306 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_events", 6)
310 self.
sendsend(input_socket,
"l")
312 self.
sendsend(second_input_socket,
"l")
316 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_stop_messages", 2)
317 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_stop_messages",
True)
319 self.assertTrue(check_histogram_output(
"outputFile.root", 2))
322 self.
sendsend(input_socket,
"x")
324 self.
sendsend(second_input_socket,
"x")
333 if __name__ ==
'__main__':
335 number_of_failures = 0
337 for i
in range(ZMQ_TEST_FOR_LOOPS):
340 except AssertionError:
341 number_of_failures += 1
344 message = f
'Number of failed for loops: {number_of_failures}/{ZMQ_TEST_FOR_LOOPS}'
345 if number_of_failures <= ZMQ_TEST_MAX_FAILURES:
346 basf2.B2INFO(message)
348 basf2.B2FATAL(message)
string event_data
event_data
def testStopPropagation(self)
histogram_data
histogram_data
monitoring_port
monitoring_port
final_collector_input_port
final_collector_input_port
string event_data
event_data
first_monitoring_port
first_monitoring_port
needed_programs
needed_programs
final_collector_monitoring_port
final_collector_monitoring_port
second_input_port
second_input_port
first_input_port
first_input_port
def check_histogram_repeated(self, file_name, expected_factor)
second_monitoring_port
second_monitoring_port
histogram_data
histogram_data
def testEventPropagation(self)
def assertIsMsgType(self, socket, message_type, final=True, router=False)
def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False)
needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
def assertMonitoring(self, socket, search_key, search_value, timeout=10)
def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
def assertNotHasOutputFile(self, output_file, timeout=0.5)
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)