9 from pathlib
import Path
10 from unittest
import main
21 event_data = open(basf2.find_file(
"daq/hbasf2/tests/out.raw"),
"br").read()
24 """Setup port numbers and necessary programs"""
50 "--input", f
"tcp://localhost:{self.distributor_input_port}",
51 "--output", f
"tcp://*:{self.distributor_output_port}",
52 "--monitor", f
"tcp://*:{self.distributor_monitoring_port}"
56 "--input", f
"tcp://*:{self.collector_input_port}",
57 "--output", f
"tcp://*:{self.collector_output_port}",
58 "--monitor", f
"tcp://*:{self.collector_monitoring_port}"
61 "b2hlt_finalcollector",
62 "--input", f
"tcp://*:{self.final_collector_input_port}",
63 "--output", f
"tcp://localhost:{self.final_collector_output_port}",
64 "--monitor", f
"tcp://*:{self.final_collector_monitoring_port}"
67 "python3", basf2.find_file(
"daq/hbasf2/tests/passthrough.no_run_py"),
68 "--input", f
"tcp://localhost:{self.distributor_output_port}",
69 "--output", f
"tcp://localhost:{self.collector_input_port}"
72 "python3", basf2.find_file(
"daq/hbasf2/tests/passthrough.no_run_py"),
73 "--prefix",
"output_",
74 "--input", f
"tcp://localhost:{self.collector_output_port}",
75 "--output", f
"tcp://localhost:{self.final_collector_input_port}"
87 input_identity, _ = self.
recvrecv(input_socket)
90 output_identity, _ = self.
recvrecv(output_socket)
93 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
94 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
95 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
96 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 20)
97 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 1)
99 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"input.registered_workers", 1)
100 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"output.ready_queue_size", 20)
101 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"output.registered_workers", 1)
103 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
110 input_socket.send_multipart([input_identity, self.
event_dataevent_data])
112 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
113 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
114 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
115 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 20)
116 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 1)
117 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.sent_events", 20)
119 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"input.registered_workers", 1)
120 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"input.received_events", 20)
121 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"output.ready_queue_size", 20)
122 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"output.registered_workers", 1)
123 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"output.sent_events", 20)
125 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
126 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.received_events", 20)
127 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"output.sent_events", 20)
133 while output_socket.poll(0):
134 _, msg = self.
recvrecv(output_socket)
139 self.assertEqual(len(buffer), 122638 * 20)
142 self.
sendsend(distributor_monitoring_socket,
"l")
144 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
145 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
146 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
147 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 20)
148 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 1)
150 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"input.registered_workers", 1)
151 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"input.received_events", 20)
152 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"output.ready_queue_size", 20)
153 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"output.registered_workers", 1)
154 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"input.received_stop_messages", 1)
155 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"input.all_stop_messages", 1)
157 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
158 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.received_events", 20)
159 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.received_stop_messages", 1)
160 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.all_stop_messages", 1)
170 self.
sendsend(distributor_monitoring_socket,
"l")
172 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
173 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
174 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
175 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 20)
176 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 1)
178 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"input.registered_workers", 1)
179 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"input.received_events", 20)
180 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"output.ready_queue_size", 20)
181 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"output.registered_workers", 1)
182 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"input.received_stop_messages", 1)
183 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"input.all_stop_messages", 1)
185 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
186 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.received_events", 20)
187 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.received_stop_messages", 1)
188 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.all_stop_messages", 1)
195 self.
sendsend(distributor_monitoring_socket,
"n")
196 self.
sendsend(collector_monitoring_socket,
"n")
197 self.
sendsend(final_collector_monitoring_socket,
"n")
199 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
200 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
201 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
202 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 20)
203 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 1)
205 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"input.registered_workers", 1)
206 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"input.received_events", 20)
207 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"output.ready_queue_size", 20)
208 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"output.registered_workers", 1)
209 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"input.received_stop_messages", 0)
210 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"input.all_stop_messages", 0)
212 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
213 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.received_events", 20)
214 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.received_stop_messages", 0)
215 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.all_stop_messages", 0)
219 input_socket.send_multipart([input_identity, self.
event_dataevent_data])
221 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
222 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
223 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
224 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 20)
225 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 1)
226 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.sent_events", 40)
228 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"input.registered_workers", 1)
229 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"input.received_events", 40)
230 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"output.ready_queue_size", 20)
231 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"output.registered_workers", 1)
232 self.
assertMonitoringassertMonitoring(collector_monitoring_socket,
"output.sent_events", 40)
234 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
235 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.received_events", 40)
236 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"output.sent_events", 40)
244 while output_socket.poll(0):
245 _, msg = self.
recvrecv(output_socket)
250 self.assertEqual(len(buffer), 122638 * 20)
253 self.
sendsend(distributor_monitoring_socket,
"x")
265 event_data = open(basf2.find_file(
"daq/hbasf2/tests/out.raw"),
"br").read()
268 """Setup port numbers and necessary programs"""
287 "--input", f
"tcp://localhost:{self.distributor_input_port}",
288 "--output", f
"tcp://*:{self.distributor_output_port}",
289 "--monitor", f
"tcp://*:{self.distributor_monitoring_port}"
292 "b2hlt_finalcollector",
"--input", f
"tcp://*:{self.final_collector_input_port}",
293 "--output", f
"tcp://localhost:{self.final_collector_output_port}",
294 "--monitor", f
"tcp://*:{self.final_collector_monitoring_port}"
297 "python3", basf2.find_file(
"daq/hbasf2/tests/passthrough.no_run_py"),
298 "--input", f
"tcp://localhost:{self.distributor_output_port}",
299 "--output", f
"tcp://localhost:{self.final_collector_input_port}"
302 "python3", basf2.find_file(
"daq/hbasf2/tests/passthrough.no_run_py"),
303 "--prefix",
"dying_",
"--exit",
304 "--input", f
"tcp://localhost:{self.distributor_output_port}",
305 "--output", f
"tcp://localhost:{self.final_collector_input_port}"
316 input_identity, _ = self.
recvrecv(input_socket)
319 output_identity, _ = self.
recvrecv(output_socket)
322 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
323 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
324 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
325 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 40)
326 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 2)
328 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 2)
335 input_socket.send_multipart([input_identity, self.
event_dataevent_data])
337 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
338 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
339 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
340 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 40)
341 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 2)
342 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.sent_events", 100)
344 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 2)
345 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.received_events", 100)
346 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"output.sent_events", 100)
352 while output_socket.poll(0):
353 _, msg = self.
recvrecv(output_socket)
358 self.assertEqual(len(buffer), 122638 * 100)
361 Path(
"dying_exit_request").touch()
365 input_socket.send_multipart([input_identity, self.
event_dataevent_data])
368 self.
assertIsDownassertIsDown(
"dying_worker", timeout=10)
371 while output_socket.poll(0):
372 _, msg = self.
recvrecv(output_socket)
378 self.assertEqual(len(buffer), 122638 * 79)
381 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
384 self.
sendsend(distributor_monitoring_socket,
"l")
386 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
387 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
388 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
389 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 20)
391 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 2)
392 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.sent_events", 200)
394 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
395 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.received_events", 179)
396 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.received_stop_messages", 1)
397 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.all_stop_messages", 1)
403 self.
sendsend(distributor_monitoring_socket,
"n")
404 self.
sendsend(final_collector_monitoring_socket,
"n")
406 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
407 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
408 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
409 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 20)
410 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 2)
412 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
413 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.received_events", 179)
414 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.received_stop_messages", 0)
415 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.all_stop_messages", 0)
418 input_socket.send_multipart([input_identity, self.
event_dataevent_data])
420 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
421 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
422 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
423 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 20)
424 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 2)
425 self.
assertMonitoringassertMonitoring(distributor_monitoring_socket,
"output.sent_events", 300)
427 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
428 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"input.received_events", 279)
429 self.
assertMonitoringassertMonitoring(final_collector_monitoring_socket,
"output.sent_events", 279)
435 while output_socket.poll(0):
436 _, msg = self.
recvrecv(output_socket)
441 self.assertEqual(len(buffer), 122638 * 100)
444 self.
sendsend(distributor_monitoring_socket,
"x")
451 if __name__ ==
'__main__':
453 number_of_failures = 0
455 for i
in range(ZMQ_TEST_FOR_LOOPS):
458 except AssertionError:
459 number_of_failures += 1
462 message = f
'Number of failed for loops: {number_of_failures}/{ZMQ_TEST_FOR_LOOPS}'
463 if number_of_failures <= ZMQ_TEST_MAX_FAILURES:
464 basf2.B2INFO(message)
466 basf2.B2FATAL(message)
final_collector_input_port
final_collector_input_port
distributor_monitoring_port
distributor_monitoring_port
needed_programs
needed_programs
final_collector_output_port
final_collector_output_port
distributor_output_port
distributor_output_port
distributor_input_port
distributor_input_port
final_collector_monitoring_port
final_collector_monitoring_port
final_collector_input_port
final_collector_input_port
distributor_monitoring_port
distributor_monitoring_port
needed_programs
needed_programs
collector_input_port
collector_input_port
final_collector_output_port
final_collector_output_port
distributor_output_port
distributor_output_port
distributor_input_port
distributor_input_port
collector_output_port
collector_output_port
collector_monitoring_port
collector_monitoring_port
final_collector_monitoring_port
final_collector_monitoring_port
def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False)
needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
def assertMonitoring(self, socket, search_key, search_value, timeout=10)
def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
def assertNothingMore(self, socket)
def assertNotHasOutputFile(self, output_file, timeout=0.5)
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)