Belle II Software development
test_hlt.py
1
8
9from pathlib import Path
10from unittest import main
11import basf2
12
13import zmq
14
15from zmq_daq.test_support import HLTZMQTestCase, ZMQ_TEST_FOR_LOOPS, ZMQ_TEST_MAX_FAILURES
16
17
19 """Test case"""
20
21 event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read()
22
23 def setUp(self):
24 """Setup port numbers and necessary programs"""
25
26 self.distributor_input_port = HLTZMQTestCase.get_free_port()
27
28 self.distributor_output_port = HLTZMQTestCase.get_free_port()
29
30 self.distributor_monitoring_port = HLTZMQTestCase.get_free_port()
31
32
33 self.collector_input_port = HLTZMQTestCase.get_free_port()
34
35 self.collector_output_port = HLTZMQTestCase.get_free_port()
36
37 self.collector_monitoring_port = HLTZMQTestCase.get_free_port()
38
39
40 self.final_collector_input_port = HLTZMQTestCase.get_free_port()
41
42 self.final_collector_output_port = HLTZMQTestCase.get_free_port()
43
44 self.final_collector_monitoring_port = HLTZMQTestCase.get_free_port()
45
46
48 "distributor": [
49 "b2hlt_distributor",
50 "--input", f"tcp://localhost:{self.distributor_input_port}",
51 "--output", f"tcp://*:{self.distributor_output_port}",
52 "--monitor", f"tcp://*:{self.distributor_monitoring_port}"
53 ],
54 "collector": [
55 "b2hlt_collector",
56 "--input", f"tcp://*:{self.collector_input_port}",
57 "--output", f"tcp://*:{self.collector_output_port}",
58 "--monitor", f"tcp://*:{self.collector_monitoring_port}"
59 ],
60 "final_collector": [
61 "b2hlt_finalcollector",
62 "--input", f"tcp://*:{self.final_collector_input_port}",
63 "--output", f"tcp://localhost:{self.final_collector_output_port}",
64 "--monitor", f"tcp://*:{self.final_collector_monitoring_port}"
65 ],
66 "worker": [
67 "python3", basf2.find_file("daq/hbasf2/tests/passthrough.no_run_py"),
68 "--input", f"tcp://localhost:{self.distributor_output_port}",
69 "--output", f"tcp://localhost:{self.collector_input_port}"
70 ],
71 "output_worker": [
72 "python3", basf2.find_file("daq/hbasf2/tests/passthrough.no_run_py"),
73 "--prefix", "output_",
74 "--input", f"tcp://localhost:{self.collector_output_port}",
75 "--output", f"tcp://localhost:{self.final_collector_input_port}"
76 ],
77 }
78 super().setUp()
79
80 def testFullRun(self):
81 """test function"""
82 distributor_monitoring_socket = self.create_socket(self.distributor_monitoring_port)
83 collector_monitoring_socket = self.create_socket(self.collector_monitoring_port)
84 final_collector_monitoring_socket = self.create_socket(self.final_collector_monitoring_port)
85
86 input_socket = self.create_socket(self.distributor_input_port, socket_type=zmq.STREAM, bind=True)
87 input_identity, _ = self.recv(input_socket)
88
89 output_socket = self.create_socket(self.final_collector_output_port, socket_type=zmq.STREAM, bind=True)
90 output_identity, _ = self.recv(output_socket)
91
92 # At the beginning, everything should be at normal state
93 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
94 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
95 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
96 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
97 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
98
99 self.assertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
100 self.assertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
101 self.assertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
102
103 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
104
105 self.assertHasOutputFile("initialize_called", timeout=1)
106 self.assertHasOutputFile("output_initialize_called", timeout=1)
107
108 # Now lets try sending some events
109 for _ in range(20):
110 input_socket.send_multipart([input_identity, self.event_data])
111
112 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
113 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
114 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
115 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
116 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
117 self.assertMonitoring(distributor_monitoring_socket, "output.sent_events", 20)
118
119 self.assertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
120 self.assertMonitoring(collector_monitoring_socket, "input.received_events", 20)
121 self.assertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
122 self.assertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
123 self.assertMonitoring(collector_monitoring_socket, "output.sent_events", 20)
124
125 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
126 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 20)
127 self.assertMonitoring(final_collector_monitoring_socket, "output.sent_events", 20)
128
129 self.assertHasOutputFile("beginrun_called", timeout=1)
130 self.assertHasOutputFile("output_beginrun_called", timeout=1)
131
132 buffer = b""
133 while output_socket.poll(0):
134 _, msg = self.recv(output_socket)
135 buffer += msg
136 self.assertNothingMore(output_socket)
137
138 # Data Size != raw data, as data is in different format, size taken from test itself
139 self.assertEqual(len(buffer), 122638 * 20)
140
141 # Now we stop this run
142 self.send(distributor_monitoring_socket, "l")
143
144 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
145 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
146 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
147 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
148 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
149
150 self.assertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
151 self.assertMonitoring(collector_monitoring_socket, "input.received_events", 20)
152 self.assertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
153 self.assertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
154 self.assertMonitoring(collector_monitoring_socket, "input.received_stop_messages", 1)
155 self.assertMonitoring(collector_monitoring_socket, "input.all_stop_messages", 1)
156
157 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
158 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 20)
159 self.assertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 1)
160 self.assertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 1)
161
162 # should go to the workers
163 self.assertHasOutputFile("endrun_called", timeout=1)
164 self.assertHasOutputFile("output_endrun_called", timeout=1)
165
166 # but not to the output
167 self.assertNothingMore(output_socket)
168
169 # a second stop message should only reach the first workers up to the collector
170 self.send(distributor_monitoring_socket, "l")
171
172 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
173 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
174 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
175 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
176 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
177
178 self.assertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
179 self.assertMonitoring(collector_monitoring_socket, "input.received_events", 20)
180 self.assertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
181 self.assertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
182 self.assertMonitoring(collector_monitoring_socket, "input.received_stop_messages", 1)
183 self.assertMonitoring(collector_monitoring_socket, "input.all_stop_messages", 1)
184
185 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
186 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 20)
187 self.assertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 1)
188 self.assertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 1)
189
190 # should go to the workers (longer timeout as distributor waits a bit)
191 self.assertHasOutputFile("endrun_called", timeout=5)
192 self.assertNotHasOutputFile("output_endrun_called", timeout=1)
193
194 # Now lets restart the run
195 self.send(distributor_monitoring_socket, "n")
196 self.send(collector_monitoring_socket, "n")
197 self.send(final_collector_monitoring_socket, "n")
198
199 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
200 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
201 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
202 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
203 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
204
205 self.assertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
206 self.assertMonitoring(collector_monitoring_socket, "input.received_events", 20)
207 self.assertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
208 self.assertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
209 self.assertMonitoring(collector_monitoring_socket, "input.received_stop_messages", 0)
210 self.assertMonitoring(collector_monitoring_socket, "input.all_stop_messages", 0)
211
212 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
213 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 20)
214 self.assertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 0)
215 self.assertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 0)
216
217 # And send some more events
218 for _ in range(20):
219 input_socket.send_multipart([input_identity, self.event_data])
220
221 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
222 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
223 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
224 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
225 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
226 self.assertMonitoring(distributor_monitoring_socket, "output.sent_events", 40)
227
228 self.assertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
229 self.assertMonitoring(collector_monitoring_socket, "input.received_events", 40)
230 self.assertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
231 self.assertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
232 self.assertMonitoring(collector_monitoring_socket, "output.sent_events", 40)
233
234 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
235 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 40)
236 self.assertMonitoring(final_collector_monitoring_socket, "output.sent_events", 40)
237
238 self.assertNotHasOutputFile("endrun_called", timeout=1)
239 self.assertNotHasOutputFile("output_endrun_called", timeout=1)
240 self.assertHasOutputFile("beginrun_called", timeout=1)
241 self.assertHasOutputFile("output_beginrun_called", timeout=1)
242
243 buffer = b""
244 while output_socket.poll(0):
245 _, msg = self.recv(output_socket)
246 buffer += msg
247 self.assertNothingMore(output_socket)
248
249 # Data Size != raw data, as data is in different format, size taken from test itself
250 self.assertEqual(len(buffer), 122638 * 20)
251
252 # And finally: terminate everything
253 self.send(distributor_monitoring_socket, "x")
254
255 self.assertIsDown("collector")
256 self.assertIsDown("final_collector")
257 self.assertIsDown("distributor")
258 self.assertIsDown("worker", timeout=1)
259 self.assertIsDown("output_worker")
260
261
263 """Test case"""
264
265 event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read()
266
267 def setUp(self):
268 """Setup port numbers and necessary programs"""
269
270 self.distributor_input_port = HLTZMQTestCase.get_free_port()
271
272 self.distributor_output_port = HLTZMQTestCase.get_free_port()
273
274 self.distributor_monitoring_port = HLTZMQTestCase.get_free_port()
275
276
277 self.final_collector_input_port = HLTZMQTestCase.get_free_port()
278
279 self.final_collector_output_port = HLTZMQTestCase.get_free_port()
280
281 self.final_collector_monitoring_port = HLTZMQTestCase.get_free_port()
282
283
285 "distributor": [
286 "b2hlt_distributor",
287 "--input", f"tcp://localhost:{self.distributor_input_port}",
288 "--output", f"tcp://*:{self.distributor_output_port}",
289 "--monitor", f"tcp://*:{self.distributor_monitoring_port}"
290 ],
291 "final_collector": [
292 "b2hlt_finalcollector", "--input", f"tcp://*:{self.final_collector_input_port}",
293 "--output", f"tcp://localhost:{self.final_collector_output_port}",
294 "--monitor", f"tcp://*:{self.final_collector_monitoring_port}"
295 ],
296 "worker": [
297 "python3", basf2.find_file("daq/hbasf2/tests/passthrough.no_run_py"),
298 "--input", f"tcp://localhost:{self.distributor_output_port}",
299 "--output", f"tcp://localhost:{self.final_collector_input_port}"
300 ],
301 "dying_worker": [
302 "python3", basf2.find_file("daq/hbasf2/tests/passthrough.no_run_py"),
303 "--prefix", "dying_", "--exit",
304 "--input", f"tcp://localhost:{self.distributor_output_port}",
305 "--output", f"tcp://localhost:{self.final_collector_input_port}"
306 ],
307 }
308 super().setUp()
309
310 def testFullRun(self):
311 """test function"""
312 distributor_monitoring_socket = self.create_socket(self.distributor_monitoring_port)
313 final_collector_monitoring_socket = self.create_socket(self.final_collector_monitoring_port)
314
315 input_socket = self.create_socket(self.distributor_input_port, socket_type=zmq.STREAM, bind=True)
316 input_identity, _ = self.recv(input_socket)
317
318 output_socket = self.create_socket(self.final_collector_output_port, socket_type=zmq.STREAM, bind=True)
319 output_identity, _ = self.recv(output_socket)
320
321 # At the beginning, everything should be at normal state
322 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
323 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
324 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
325 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 40)
326 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
327
328 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 2)
329
330 self.assertHasOutputFile("initialize_called", timeout=1)
331 self.assertHasOutputFile("dying_initialize_called", timeout=1)
332
333 # Now lets try sending some events
334 for _ in range(100):
335 input_socket.send_multipart([input_identity, self.event_data])
336
337 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
338 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
339 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
340 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 40)
341 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
342 self.assertMonitoring(distributor_monitoring_socket, "output.sent_events", 100)
343
344 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 2)
345 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 100)
346 self.assertMonitoring(final_collector_monitoring_socket, "output.sent_events", 100)
347
348 self.assertHasOutputFile("beginrun_called", timeout=1)
349 self.assertHasOutputFile("dying_beginrun_called", timeout=1)
350
351 buffer = b""
352 while output_socket.poll(0):
353 _, msg = self.recv(output_socket)
354 buffer += msg
355 self.assertNothingMore(output_socket)
356
357 # Data Size != raw data, as data is in different format, size taken from test itself
358 self.assertEqual(len(buffer), 122638 * 100)
359
360 # Now we kill one of the workers on purpose
361 Path("dying_exit_request").touch()
362
363 # And send some more events
364 for _ in range(100):
365 input_socket.send_multipart([input_identity, self.event_data])
366
367 self.assertHasOutputFile("dying_exit_called", timeout=1)
368 self.assertIsDown("dying_worker", timeout=10)
369
370 buffer = b""
371 while output_socket.poll(0):
372 _, msg = self.recv(output_socket)
373 buffer += msg
374 self.assertNothingMore(output_socket)
375
376 # We expect to have at least some events (but some will be missing)!
377 # 100 - 20 (queue size of dying worker) - 1 (the event that "caused" the problem) = 79
378 self.assertEqual(len(buffer), 122638 * 79)
379
380 # The collector should have it removed
381 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
382
383 # Also a stop signal should be transported correctly
384 self.send(distributor_monitoring_socket, "l")
385
386 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
387 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
388 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
389 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
390 # the distributor does not know about the dying worker, but this is also no problem
391 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
392 self.assertMonitoring(distributor_monitoring_socket, "output.sent_events", 200)
393
394 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
395 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 179)
396 self.assertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 1)
397 self.assertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 1)
398
399 # should go to the workers
400 self.assertHasOutputFile("endrun_called", timeout=1)
401
402 # Now lets restart the run
403 self.send(distributor_monitoring_socket, "n")
404 self.send(final_collector_monitoring_socket, "n")
405
406 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
407 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
408 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
409 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
410 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
411
412 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
413 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 179)
414 self.assertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 0)
415 self.assertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 0)
416
417 for _ in range(100):
418 input_socket.send_multipart([input_identity, self.event_data])
419
420 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
421 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
422 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
423 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
424 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
425 self.assertMonitoring(distributor_monitoring_socket, "output.sent_events", 300)
426
427 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
428 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 279)
429 self.assertMonitoring(final_collector_monitoring_socket, "output.sent_events", 279)
430
431 self.assertNotHasOutputFile("endrun_called", timeout=1)
432 self.assertHasOutputFile("beginrun_called", timeout=1)
433
434 buffer = b""
435 while output_socket.poll(0):
436 _, msg = self.recv(output_socket)
437 buffer += msg
438 self.assertNothingMore(output_socket)
439
440 # Now all events should be transported again
441 self.assertEqual(len(buffer), 122638 * 100)
442
443 # And finally: terminate everything
444 self.send(distributor_monitoring_socket, "x")
445
446 self.assertIsDown("final_collector")
447 self.assertIsDown("distributor")
448 self.assertIsDown("worker")
449
450
451if __name__ == '__main__':
452
453 number_of_failures = 0
454
455 for i in range(ZMQ_TEST_FOR_LOOPS):
456 try:
457 main(exit=False)
458 except AssertionError:
459 number_of_failures += 1
460
461
462 message = f'Number of failed for loops: {number_of_failures}/{ZMQ_TEST_FOR_LOOPS}'
463 if number_of_failures <= ZMQ_TEST_MAX_FAILURES:
464 basf2.B2INFO(message)
465 else:
466 basf2.B2FATAL(message)
final_collector_input_port
final_collector_input_port
Definition: test_hlt.py:277
distributor_monitoring_port
distributor_monitoring_port
Definition: test_hlt.py:274
needed_programs
needed_programs
Definition: test_hlt.py:284
final_collector_output_port
final_collector_output_port
Definition: test_hlt.py:279
distributor_output_port
distributor_output_port
Definition: test_hlt.py:272
distributor_input_port
distributor_input_port
Definition: test_hlt.py:270
final_collector_monitoring_port
final_collector_monitoring_port
Definition: test_hlt.py:281
open event_data
event_data
Definition: test_hlt.py:265
final_collector_input_port
final_collector_input_port
Definition: test_hlt.py:40
def setUp(self)
Definition: test_hlt.py:23
distributor_monitoring_port
distributor_monitoring_port
Definition: test_hlt.py:30
needed_programs
needed_programs
Definition: test_hlt.py:47
collector_input_port
collector_input_port
Definition: test_hlt.py:33
final_collector_output_port
final_collector_output_port
Definition: test_hlt.py:42
distributor_output_port
distributor_output_port
Definition: test_hlt.py:28
def testFullRun(self)
Definition: test_hlt.py:80
distributor_input_port
distributor_input_port
Definition: test_hlt.py:26
collector_output_port
collector_output_port
Definition: test_hlt.py:35
collector_monitoring_port
collector_monitoring_port
Definition: test_hlt.py:37
final_collector_monitoring_port
final_collector_monitoring_port
Definition: test_hlt.py:44
open event_data
event_data
Definition: test_hlt.py:21
dict needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
Definition: test_support.py:38
def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False)
def assertMonitoring(self, socket, search_key, search_value, timeout=10)
def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
def assertNotHasOutputFile(self, output_file, timeout=0.5)
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)
Definition: test_support.py:93
Definition: main.py:1