Belle II Software  release-08-01-10
test_hlt.py
1 
8 
9 from pathlib import Path
10 from unittest import main
11 import basf2
12 
13 import zmq
14 
15 from zmq_daq.test_support import HLTZMQTestCase, ZMQ_TEST_FOR_LOOPS, ZMQ_TEST_MAX_FAILURES
16 
17 
19  """Test case"""
20 
21  event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read()
22 
23  def setUp(self):
24  """Setup port numbers and necessary programs"""
25 
26  self.distributor_input_portdistributor_input_port = HLTZMQTestCase.get_free_port()
27 
28  self.distributor_output_portdistributor_output_port = HLTZMQTestCase.get_free_port()
29 
30  self.distributor_monitoring_portdistributor_monitoring_port = HLTZMQTestCase.get_free_port()
31 
32 
33  self.collector_input_portcollector_input_port = HLTZMQTestCase.get_free_port()
34 
35  self.collector_output_portcollector_output_port = HLTZMQTestCase.get_free_port()
36 
37  self.collector_monitoring_portcollector_monitoring_port = HLTZMQTestCase.get_free_port()
38 
39 
40  self.final_collector_input_portfinal_collector_input_port = HLTZMQTestCase.get_free_port()
41 
42  self.final_collector_output_portfinal_collector_output_port = HLTZMQTestCase.get_free_port()
43 
44  self.final_collector_monitoring_portfinal_collector_monitoring_port = HLTZMQTestCase.get_free_port()
45 
46 
47  self.needed_programsneeded_programsneeded_programs = {
48  "distributor": [
49  "b2hlt_distributor",
50  "--input", f"tcp://localhost:{self.distributor_input_port}",
51  "--output", f"tcp://*:{self.distributor_output_port}",
52  "--monitor", f"tcp://*:{self.distributor_monitoring_port}"
53  ],
54  "collector": [
55  "b2hlt_collector",
56  "--input", f"tcp://*:{self.collector_input_port}",
57  "--output", f"tcp://*:{self.collector_output_port}",
58  "--monitor", f"tcp://*:{self.collector_monitoring_port}"
59  ],
60  "final_collector": [
61  "b2hlt_finalcollector",
62  "--input", f"tcp://*:{self.final_collector_input_port}",
63  "--output", f"tcp://localhost:{self.final_collector_output_port}",
64  "--monitor", f"tcp://*:{self.final_collector_monitoring_port}"
65  ],
66  "worker": [
67  "python3", basf2.find_file("daq/hbasf2/tests/passthrough.no_run_py"),
68  "--input", f"tcp://localhost:{self.distributor_output_port}",
69  "--output", f"tcp://localhost:{self.collector_input_port}"
70  ],
71  "output_worker": [
72  "python3", basf2.find_file("daq/hbasf2/tests/passthrough.no_run_py"),
73  "--prefix", "output_",
74  "--input", f"tcp://localhost:{self.collector_output_port}",
75  "--output", f"tcp://localhost:{self.final_collector_input_port}"
76  ],
77  }
78  super().setUp()
79 
80  def testFullRun(self):
81  """test function"""
82  distributor_monitoring_socket = self.create_socketcreate_socket(self.distributor_monitoring_portdistributor_monitoring_port)
83  collector_monitoring_socket = self.create_socketcreate_socket(self.collector_monitoring_portcollector_monitoring_port)
84  final_collector_monitoring_socket = self.create_socketcreate_socket(self.final_collector_monitoring_portfinal_collector_monitoring_port)
85 
86  input_socket = self.create_socketcreate_socket(self.distributor_input_portdistributor_input_port, socket_type=zmq.STREAM, bind=True)
87  input_identity, _ = self.recvrecv(input_socket)
88 
89  output_socket = self.create_socketcreate_socket(self.final_collector_output_portfinal_collector_output_port, socket_type=zmq.STREAM, bind=True)
90  output_identity, _ = self.recvrecv(output_socket)
91 
92  # At the beginning, everything should be at normal state
93  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
94  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
95  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
96  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
97  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
98 
99  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
100  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
101  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
102 
103  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
104 
105  self.assertHasOutputFileassertHasOutputFile("initialize_called", timeout=1)
106  self.assertHasOutputFileassertHasOutputFile("output_initialize_called", timeout=1)
107 
108  # Now lets try sending some events
109  for _ in range(20):
110  input_socket.send_multipart([input_identity, self.event_dataevent_data])
111 
112  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
113  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
114  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
115  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
116  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
117  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.sent_events", 20)
118 
119  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
120  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "input.received_events", 20)
121  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
122  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
123  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "output.sent_events", 20)
124 
125  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
126  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.received_events", 20)
127  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "output.sent_events", 20)
128 
129  self.assertHasOutputFileassertHasOutputFile("beginrun_called", timeout=1)
130  self.assertHasOutputFileassertHasOutputFile("output_beginrun_called", timeout=1)
131 
132  buffer = b""
133  while output_socket.poll(0):
134  _, msg = self.recvrecv(output_socket)
135  buffer += msg
136  self.assertNothingMoreassertNothingMore(output_socket)
137 
138  # Data Size != raw data, as data is in different format, size taken from test itself
139  self.assertEqual(len(buffer), 122638 * 20)
140 
141  # Now we stop this run
142  self.sendsend(distributor_monitoring_socket, "l")
143 
144  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
145  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
146  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
147  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
148  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
149 
150  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
151  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "input.received_events", 20)
152  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
153  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
154  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "input.received_stop_messages", 1)
155  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "input.all_stop_messages", 1)
156 
157  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
158  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.received_events", 20)
159  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 1)
160  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 1)
161 
162  # should go to the workers
163  self.assertHasOutputFileassertHasOutputFile("endrun_called", timeout=1)
164  self.assertHasOutputFileassertHasOutputFile("output_endrun_called", timeout=1)
165 
166  # but not to the output
167  self.assertNothingMoreassertNothingMore(output_socket)
168 
169  # a second stop message should only reach the first workers up to the collector
170  self.sendsend(distributor_monitoring_socket, "l")
171 
172  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
173  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
174  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
175  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
176  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
177 
178  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
179  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "input.received_events", 20)
180  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
181  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
182  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "input.received_stop_messages", 1)
183  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "input.all_stop_messages", 1)
184 
185  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
186  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.received_events", 20)
187  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 1)
188  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 1)
189 
190  # should go to the workers (longer timeout as distributor waits a bit)
191  self.assertHasOutputFileassertHasOutputFile("endrun_called", timeout=5)
192  self.assertNotHasOutputFileassertNotHasOutputFile("output_endrun_called", timeout=1)
193 
194  # Now lets restart the run
195  self.sendsend(distributor_monitoring_socket, "n")
196  self.sendsend(collector_monitoring_socket, "n")
197  self.sendsend(final_collector_monitoring_socket, "n")
198 
199  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
200  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
201  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
202  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
203  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
204 
205  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
206  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "input.received_events", 20)
207  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
208  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
209  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "input.received_stop_messages", 0)
210  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "input.all_stop_messages", 0)
211 
212  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
213  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.received_events", 20)
214  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 0)
215  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 0)
216 
217  # And send some more events
218  for _ in range(20):
219  input_socket.send_multipart([input_identity, self.event_dataevent_data])
220 
221  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
222  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
223  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
224  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
225  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
226  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.sent_events", 40)
227 
228  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
229  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "input.received_events", 40)
230  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
231  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
232  self.assertMonitoringassertMonitoring(collector_monitoring_socket, "output.sent_events", 40)
233 
234  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
235  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.received_events", 40)
236  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "output.sent_events", 40)
237 
238  self.assertNotHasOutputFileassertNotHasOutputFile("endrun_called", timeout=1)
239  self.assertNotHasOutputFileassertNotHasOutputFile("output_endrun_called", timeout=1)
240  self.assertHasOutputFileassertHasOutputFile("beginrun_called", timeout=1)
241  self.assertHasOutputFileassertHasOutputFile("output_beginrun_called", timeout=1)
242 
243  buffer = b""
244  while output_socket.poll(0):
245  _, msg = self.recvrecv(output_socket)
246  buffer += msg
247  self.assertNothingMoreassertNothingMore(output_socket)
248 
249  # Data Size != raw data, as data is in different format, size taken from test itself
250  self.assertEqual(len(buffer), 122638 * 20)
251 
252  # And finally: terminate everything
253  self.sendsend(distributor_monitoring_socket, "x")
254 
255  self.assertIsDownassertIsDown("collector")
256  self.assertIsDownassertIsDown("final_collector")
257  self.assertIsDownassertIsDown("distributor")
258  self.assertIsDownassertIsDown("worker", timeout=1)
259  self.assertIsDownassertIsDown("output_worker")
260 
261 
263  """Test case"""
264 
265  event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read()
266 
267  def setUp(self):
268  """Setup port numbers and necessary programs"""
269 
270  self.distributor_input_portdistributor_input_port = HLTZMQTestCase.get_free_port()
271 
272  self.distributor_output_portdistributor_output_port = HLTZMQTestCase.get_free_port()
273 
274  self.distributor_monitoring_portdistributor_monitoring_port = HLTZMQTestCase.get_free_port()
275 
276 
277  self.final_collector_input_portfinal_collector_input_port = HLTZMQTestCase.get_free_port()
278 
279  self.final_collector_output_portfinal_collector_output_port = HLTZMQTestCase.get_free_port()
280 
281  self.final_collector_monitoring_portfinal_collector_monitoring_port = HLTZMQTestCase.get_free_port()
282 
283 
284  self.needed_programsneeded_programsneeded_programs = {
285  "distributor": [
286  "b2hlt_distributor",
287  "--input", f"tcp://localhost:{self.distributor_input_port}",
288  "--output", f"tcp://*:{self.distributor_output_port}",
289  "--monitor", f"tcp://*:{self.distributor_monitoring_port}"
290  ],
291  "final_collector": [
292  "b2hlt_finalcollector", "--input", f"tcp://*:{self.final_collector_input_port}",
293  "--output", f"tcp://localhost:{self.final_collector_output_port}",
294  "--monitor", f"tcp://*:{self.final_collector_monitoring_port}"
295  ],
296  "worker": [
297  "python3", basf2.find_file("daq/hbasf2/tests/passthrough.no_run_py"),
298  "--input", f"tcp://localhost:{self.distributor_output_port}",
299  "--output", f"tcp://localhost:{self.final_collector_input_port}"
300  ],
301  "dying_worker": [
302  "python3", basf2.find_file("daq/hbasf2/tests/passthrough.no_run_py"),
303  "--prefix", "dying_", "--exit",
304  "--input", f"tcp://localhost:{self.distributor_output_port}",
305  "--output", f"tcp://localhost:{self.final_collector_input_port}"
306  ],
307  }
308  super().setUp()
309 
310  def testFullRun(self):
311  """test function"""
312  distributor_monitoring_socket = self.create_socketcreate_socket(self.distributor_monitoring_portdistributor_monitoring_port)
313  final_collector_monitoring_socket = self.create_socketcreate_socket(self.final_collector_monitoring_portfinal_collector_monitoring_port)
314 
315  input_socket = self.create_socketcreate_socket(self.distributor_input_portdistributor_input_port, socket_type=zmq.STREAM, bind=True)
316  input_identity, _ = self.recvrecv(input_socket)
317 
318  output_socket = self.create_socketcreate_socket(self.final_collector_output_portfinal_collector_output_port, socket_type=zmq.STREAM, bind=True)
319  output_identity, _ = self.recvrecv(output_socket)
320 
321  # At the beginning, everything should be at normal state
322  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
323  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
324  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
325  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 40)
326  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
327 
328  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 2)
329 
330  self.assertHasOutputFileassertHasOutputFile("initialize_called", timeout=1)
331  self.assertHasOutputFileassertHasOutputFile("dying_initialize_called", timeout=1)
332 
333  # Now lets try sending some events
334  for _ in range(100):
335  input_socket.send_multipart([input_identity, self.event_dataevent_data])
336 
337  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
338  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
339  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
340  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 40)
341  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
342  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.sent_events", 100)
343 
344  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 2)
345  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.received_events", 100)
346  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "output.sent_events", 100)
347 
348  self.assertHasOutputFileassertHasOutputFile("beginrun_called", timeout=1)
349  self.assertHasOutputFileassertHasOutputFile("dying_beginrun_called", timeout=1)
350 
351  buffer = b""
352  while output_socket.poll(0):
353  _, msg = self.recvrecv(output_socket)
354  buffer += msg
355  self.assertNothingMoreassertNothingMore(output_socket)
356 
357  # Data Size != raw data, as data is in different format, size taken from test itself
358  self.assertEqual(len(buffer), 122638 * 100)
359 
360  # Now we kill one of the workers on purpose
361  Path("dying_exit_request").touch()
362 
363  # And send some more events
364  for _ in range(100):
365  input_socket.send_multipart([input_identity, self.event_dataevent_data])
366 
367  self.assertHasOutputFileassertHasOutputFile("dying_exit_called", timeout=1)
368  self.assertIsDownassertIsDown("dying_worker", timeout=10)
369 
370  buffer = b""
371  while output_socket.poll(0):
372  _, msg = self.recvrecv(output_socket)
373  buffer += msg
374  self.assertNothingMoreassertNothingMore(output_socket)
375 
376  # We expect to have at least some events (but some will be missing)!
377  # 100 - 20 (queue size of dying worker) - 1 (the event that "caused" the problem) = 79
378  self.assertEqual(len(buffer), 122638 * 79)
379 
380  # The collector should have it removed
381  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
382 
383  # Also a stop signal should be transported correctly
384  self.sendsend(distributor_monitoring_socket, "l")
385 
386  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
387  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
388  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
389  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
390  # the distributor does not know about the dying worker, but this is also no problem
391  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
392  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.sent_events", 200)
393 
394  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
395  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.received_events", 179)
396  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 1)
397  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 1)
398 
399  # should go to the workers
400  self.assertHasOutputFileassertHasOutputFile("endrun_called", timeout=1)
401 
402  # Now lets restart the run
403  self.sendsend(distributor_monitoring_socket, "n")
404  self.sendsend(final_collector_monitoring_socket, "n")
405 
406  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
407  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
408  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
409  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
410  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
411 
412  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
413  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.received_events", 179)
414  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 0)
415  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 0)
416 
417  for _ in range(100):
418  input_socket.send_multipart([input_identity, self.event_dataevent_data])
419 
420  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
421  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
422  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
423  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
424  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
425  self.assertMonitoringassertMonitoring(distributor_monitoring_socket, "output.sent_events", 300)
426 
427  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
428  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "input.received_events", 279)
429  self.assertMonitoringassertMonitoring(final_collector_monitoring_socket, "output.sent_events", 279)
430 
431  self.assertNotHasOutputFileassertNotHasOutputFile("endrun_called", timeout=1)
432  self.assertHasOutputFileassertHasOutputFile("beginrun_called", timeout=1)
433 
434  buffer = b""
435  while output_socket.poll(0):
436  _, msg = self.recvrecv(output_socket)
437  buffer += msg
438  self.assertNothingMoreassertNothingMore(output_socket)
439 
440  # Now all events should be transported again
441  self.assertEqual(len(buffer), 122638 * 100)
442 
443  # And finally: terminate everything
444  self.sendsend(distributor_monitoring_socket, "x")
445 
446  self.assertIsDownassertIsDown("final_collector")
447  self.assertIsDownassertIsDown("distributor")
448  self.assertIsDownassertIsDown("worker")
449 
450 
451 if __name__ == '__main__':
452 
453  number_of_failures = 0
454 
455  for i in range(ZMQ_TEST_FOR_LOOPS):
456  try:
457  main(exit=False)
458  except AssertionError:
459  number_of_failures += 1
460 
461 
462  message = f'Number of failed for loops: {number_of_failures}/{ZMQ_TEST_FOR_LOOPS}'
463  if number_of_failures <= ZMQ_TEST_MAX_FAILURES:
464  basf2.B2INFO(message)
465  else:
466  basf2.B2FATAL(message)
final_collector_input_port
final_collector_input_port
Definition: test_hlt.py:277
distributor_monitoring_port
distributor_monitoring_port
Definition: test_hlt.py:274
needed_programs
needed_programs
Definition: test_hlt.py:284
final_collector_output_port
final_collector_output_port
Definition: test_hlt.py:279
distributor_output_port
distributor_output_port
Definition: test_hlt.py:272
distributor_input_port
distributor_input_port
Definition: test_hlt.py:270
final_collector_monitoring_port
final_collector_monitoring_port
Definition: test_hlt.py:281
final_collector_input_port
final_collector_input_port
Definition: test_hlt.py:40
def setUp(self)
Definition: test_hlt.py:23
distributor_monitoring_port
distributor_monitoring_port
Definition: test_hlt.py:30
needed_programs
needed_programs
Definition: test_hlt.py:47
collector_input_port
collector_input_port
Definition: test_hlt.py:33
final_collector_output_port
final_collector_output_port
Definition: test_hlt.py:42
distributor_output_port
distributor_output_port
Definition: test_hlt.py:28
def testFullRun(self)
Definition: test_hlt.py:80
distributor_input_port
distributor_input_port
Definition: test_hlt.py:26
collector_output_port
collector_output_port
Definition: test_hlt.py:35
event_data
event_data
Definition: test_hlt.py:21
collector_monitoring_port
collector_monitoring_port
Definition: test_hlt.py:37
final_collector_monitoring_port
final_collector_monitoring_port
Definition: test_hlt.py:44
def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False)
needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
Definition: test_support.py:38
def assertMonitoring(self, socket, search_key, search_value, timeout=10)
def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
def assertNotHasOutputFile(self, output_file, timeout=0.5)
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)
Definition: test_support.py:93
Definition: main.py:1