2 from pathlib
import Path
3 from unittest
import main
14 event_data = open(basf2.find_file(
"daq/hbasf2/tests/out.raw"),
"br").read()
17 """Setup port numbers and necessary programs"""
43 "--input", f
"tcp://localhost:{self.distributor_input_port}",
44 "--output", f
"tcp://*:{self.distributor_output_port}",
45 "--monitor", f
"tcp://*:{self.distributor_monitoring_port}"
49 "--input", f
"tcp://*:{self.collector_input_port}",
50 "--output", f
"tcp://*:{self.collector_output_port}",
51 "--monitor", f
"tcp://*:{self.collector_monitoring_port}"
54 "b2hlt_finalcollector",
55 "--input", f
"tcp://*:{self.final_collector_input_port}",
56 "--output", f
"tcp://localhost:{self.final_collector_output_port}",
57 "--monitor", f
"tcp://*:{self.final_collector_monitoring_port}"
60 "python3", basf2.find_file(
"daq/hbasf2/tests/passthrough.no_run_py"),
61 "--input", f
"tcp://localhost:{self.distributor_output_port}",
62 "--output", f
"tcp://localhost:{self.collector_input_port}"
65 "python3", basf2.find_file(
"daq/hbasf2/tests/passthrough.no_run_py"),
66 "--prefix",
"output_",
67 "--input", f
"tcp://localhost:{self.collector_output_port}",
68 "--output", f
"tcp://localhost:{self.final_collector_input_port}"
80 input_identity, _ = self.
recv(input_socket)
83 output_identity, _ = self.
recv(output_socket)
86 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
87 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
88 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
89 self.
assertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 20)
90 self.
assertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 1)
92 self.
assertMonitoring(collector_monitoring_socket,
"input.registered_workers", 1)
93 self.
assertMonitoring(collector_monitoring_socket,
"output.ready_queue_size", 20)
94 self.
assertMonitoring(collector_monitoring_socket,
"output.registered_workers", 1)
96 self.
assertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
103 input_socket.send_multipart([input_identity, self.
event_data])
105 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
106 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
107 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
108 self.
assertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 20)
109 self.
assertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 1)
110 self.
assertMonitoring(distributor_monitoring_socket,
"output.sent_events", 20)
112 self.
assertMonitoring(collector_monitoring_socket,
"input.registered_workers", 1)
113 self.
assertMonitoring(collector_monitoring_socket,
"input.received_events", 20)
114 self.
assertMonitoring(collector_monitoring_socket,
"output.ready_queue_size", 20)
115 self.
assertMonitoring(collector_monitoring_socket,
"output.registered_workers", 1)
116 self.
assertMonitoring(collector_monitoring_socket,
"output.sent_events", 20)
118 self.
assertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
119 self.
assertMonitoring(final_collector_monitoring_socket,
"input.received_events", 20)
120 self.
assertMonitoring(final_collector_monitoring_socket,
"output.sent_events", 20)
126 while output_socket.poll(0):
127 _, msg = self.
recv(output_socket)
132 self.assertEqual(len(buffer), 122638 * 20)
135 self.
send(distributor_monitoring_socket,
"l")
137 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
138 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
139 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
140 self.
assertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 20)
141 self.
assertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 1)
143 self.
assertMonitoring(collector_monitoring_socket,
"input.registered_workers", 1)
144 self.
assertMonitoring(collector_monitoring_socket,
"input.received_events", 20)
145 self.
assertMonitoring(collector_monitoring_socket,
"output.ready_queue_size", 20)
146 self.
assertMonitoring(collector_monitoring_socket,
"output.registered_workers", 1)
147 self.
assertMonitoring(collector_monitoring_socket,
"input.received_stop_messages", 1)
148 self.
assertMonitoring(collector_monitoring_socket,
"input.all_stop_messages", 1)
150 self.
assertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
151 self.
assertMonitoring(final_collector_monitoring_socket,
"input.received_events", 20)
152 self.
assertMonitoring(final_collector_monitoring_socket,
"input.received_stop_messages", 1)
153 self.
assertMonitoring(final_collector_monitoring_socket,
"input.all_stop_messages", 1)
163 self.
send(distributor_monitoring_socket,
"l")
165 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
166 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
167 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
168 self.
assertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 20)
169 self.
assertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 1)
171 self.
assertMonitoring(collector_monitoring_socket,
"input.registered_workers", 1)
172 self.
assertMonitoring(collector_monitoring_socket,
"input.received_events", 20)
173 self.
assertMonitoring(collector_monitoring_socket,
"output.ready_queue_size", 20)
174 self.
assertMonitoring(collector_monitoring_socket,
"output.registered_workers", 1)
175 self.
assertMonitoring(collector_monitoring_socket,
"input.received_stop_messages", 1)
176 self.
assertMonitoring(collector_monitoring_socket,
"input.all_stop_messages", 1)
178 self.
assertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
179 self.
assertMonitoring(final_collector_monitoring_socket,
"input.received_events", 20)
180 self.
assertMonitoring(final_collector_monitoring_socket,
"input.received_stop_messages", 1)
181 self.
assertMonitoring(final_collector_monitoring_socket,
"input.all_stop_messages", 1)
188 self.
send(distributor_monitoring_socket,
"n")
189 self.
send(collector_monitoring_socket,
"n")
190 self.
send(final_collector_monitoring_socket,
"n")
192 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
193 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
194 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
195 self.
assertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 20)
196 self.
assertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 1)
198 self.
assertMonitoring(collector_monitoring_socket,
"input.registered_workers", 1)
199 self.
assertMonitoring(collector_monitoring_socket,
"input.received_events", 20)
200 self.
assertMonitoring(collector_monitoring_socket,
"output.ready_queue_size", 20)
201 self.
assertMonitoring(collector_monitoring_socket,
"output.registered_workers", 1)
202 self.
assertMonitoring(collector_monitoring_socket,
"input.received_stop_messages", 0)
203 self.
assertMonitoring(collector_monitoring_socket,
"input.all_stop_messages", 0)
205 self.
assertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
206 self.
assertMonitoring(final_collector_monitoring_socket,
"input.received_events", 20)
207 self.
assertMonitoring(final_collector_monitoring_socket,
"input.received_stop_messages", 0)
208 self.
assertMonitoring(final_collector_monitoring_socket,
"input.all_stop_messages", 0)
212 input_socket.send_multipart([input_identity, self.
event_data])
214 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
215 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
216 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
217 self.
assertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 20)
218 self.
assertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 1)
219 self.
assertMonitoring(distributor_monitoring_socket,
"output.sent_events", 40)
221 self.
assertMonitoring(collector_monitoring_socket,
"input.registered_workers", 1)
222 self.
assertMonitoring(collector_monitoring_socket,
"input.received_events", 40)
223 self.
assertMonitoring(collector_monitoring_socket,
"output.ready_queue_size", 20)
224 self.
assertMonitoring(collector_monitoring_socket,
"output.registered_workers", 1)
225 self.
assertMonitoring(collector_monitoring_socket,
"output.sent_events", 40)
227 self.
assertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
228 self.
assertMonitoring(final_collector_monitoring_socket,
"input.received_events", 40)
229 self.
assertMonitoring(final_collector_monitoring_socket,
"output.sent_events", 40)
237 while output_socket.poll(0):
238 _, msg = self.
recv(output_socket)
243 self.assertEqual(len(buffer), 122638 * 20)
246 self.
send(distributor_monitoring_socket,
"x")
258 event_data = open(basf2.find_file(
"daq/hbasf2/tests/out.raw"),
"br").read()
261 """Setup port numbers and necessary programs"""
280 "--input", f
"tcp://localhost:{self.distributor_input_port}",
281 "--output", f
"tcp://*:{self.distributor_output_port}",
282 "--monitor", f
"tcp://*:{self.distributor_monitoring_port}"
285 "b2hlt_finalcollector",
"--input", f
"tcp://*:{self.final_collector_input_port}",
286 "--output", f
"tcp://localhost:{self.final_collector_output_port}",
287 "--monitor", f
"tcp://*:{self.final_collector_monitoring_port}"
290 "python3", basf2.find_file(
"daq/hbasf2/tests/passthrough.no_run_py"),
291 "--input", f
"tcp://localhost:{self.distributor_output_port}",
292 "--output", f
"tcp://localhost:{self.final_collector_input_port}"
295 "python3", basf2.find_file(
"daq/hbasf2/tests/passthrough.no_run_py"),
296 "--prefix",
"dying_",
"--exit",
297 "--input", f
"tcp://localhost:{self.distributor_output_port}",
298 "--output", f
"tcp://localhost:{self.final_collector_input_port}"
309 input_identity, _ = self.
recv(input_socket)
312 output_identity, _ = self.
recv(output_socket)
315 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
316 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
317 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
318 self.
assertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 40)
319 self.
assertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 2)
321 self.
assertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 2)
328 input_socket.send_multipart([input_identity, self.
event_data])
330 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
331 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
332 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
333 self.
assertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 40)
334 self.
assertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 2)
335 self.
assertMonitoring(distributor_monitoring_socket,
"output.sent_events", 100)
337 self.
assertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 2)
338 self.
assertMonitoring(final_collector_monitoring_socket,
"input.received_events", 100)
339 self.
assertMonitoring(final_collector_monitoring_socket,
"output.sent_events", 100)
345 while output_socket.poll(0):
346 _, msg = self.
recv(output_socket)
351 self.assertEqual(len(buffer), 122638 * 100)
354 Path(
"dying_exit_request").touch()
358 input_socket.send_multipart([input_identity, self.
event_data])
364 while output_socket.poll(0):
365 _, msg = self.
recv(output_socket)
371 self.assertEqual(len(buffer), 122638 * 79)
374 self.
assertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
377 self.
send(distributor_monitoring_socket,
"l")
379 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
380 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
381 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
382 self.
assertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 20)
384 self.
assertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 2)
385 self.
assertMonitoring(distributor_monitoring_socket,
"output.sent_events", 200)
387 self.
assertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
388 self.
assertMonitoring(final_collector_monitoring_socket,
"input.received_events", 179)
389 self.
assertMonitoring(final_collector_monitoring_socket,
"input.received_stop_messages", 1)
390 self.
assertMonitoring(final_collector_monitoring_socket,
"input.all_stop_messages", 1)
396 self.
send(distributor_monitoring_socket,
"n")
397 self.
send(final_collector_monitoring_socket,
"n")
399 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
400 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
401 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
402 self.
assertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 20)
403 self.
assertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 2)
405 self.
assertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
406 self.
assertMonitoring(final_collector_monitoring_socket,
"input.received_events", 179)
407 self.
assertMonitoring(final_collector_monitoring_socket,
"input.received_stop_messages", 0)
408 self.
assertMonitoring(final_collector_monitoring_socket,
"input.all_stop_messages", 0)
411 input_socket.send_multipart([input_identity, self.
event_data])
413 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_state",
"connected")
414 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_connects", 1)
415 self.
assertMonitoring(distributor_monitoring_socket,
"input.socket_disconnects", 0)
416 self.
assertMonitoring(distributor_monitoring_socket,
"output.ready_queue_size", 20)
417 self.
assertMonitoring(distributor_monitoring_socket,
"output.registered_workers", 2)
418 self.
assertMonitoring(distributor_monitoring_socket,
"output.sent_events", 300)
420 self.
assertMonitoring(final_collector_monitoring_socket,
"input.registered_workers", 1)
421 self.
assertMonitoring(final_collector_monitoring_socket,
"input.received_events", 279)
422 self.
assertMonitoring(final_collector_monitoring_socket,
"output.sent_events", 279)
428 while output_socket.poll(0):
429 _, msg = self.
recv(output_socket)
434 self.assertEqual(len(buffer), 122638 * 100)
437 self.
send(distributor_monitoring_socket,
"x")
444 if __name__ ==
'__main__':