Belle II Software development
CollectorTestCase Class Reference
Inheritance diagram for CollectorTestCase:
BaseCollectorTestCase HLTZMQTestCase

Public Member Functions

 setUp (self)
 
 create_output_socket (self)
 
 get_signal (self, output_socket, signal_type)
 
 get_event (self, output_socket)
 
 testHelloAndMessageTransmission (self)
 
 testWrongRegistration (self)
 
 testEventPropagation (self)
 
 tearDown (self)
 
 assertIsDown (self, name, timeout=5, minimum_delay=0.1)
 
 assertIsRunning (self, name)
 
 assertMonitoring (self, socket, search_key, search_value, timeout=10)
 
 assertIsAndGet (self, socket, message_type, final=True, router=False)
 
 assertIsMsgType (self, socket, message_type, final=True, router=False)
 
 assertNothingMore (self, socket)
 
 assertHasOutputFile (self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
 
 assertNotHasOutputFile (self, output_file, timeout=0.5)
 

Static Public Member Functions

 get_free_port ()
 
 create_socket (port, socket_type=zmq.DEALER, identity="socket", bind=False)
 
 create_router_socket (port)
 
 send (socket, message_type, first_data=b"", second_data=b"", identity="")
 
 recv (socket)
 

Public Attributes

 input_port = HLTZMQTestCase.get_free_port()
 input_port
 
 output_port = HLTZMQTestCase.get_free_port()
 output_port
 
 monitoring_port = HLTZMQTestCase.get_free_port()
 monitoring_port
 
 test_dir = tempfile.mkdtemp()
 use a temporary folder for testing
 
 previous_dir = os.getcwd()
 remember current working directory
 
 started_programs = dict()
 dict for all started programs
 
 tearDown = subprocess.Popen(command, start_new_session=True)
 

Static Public Attributes

bool final_collector = False
 final_collector
 
 ctx = zmq.Context()
 The ZMQ context.
 
 needed_programs = dict()
 The dict name -> cmd args of the programs to start, needs to be set in each test.
 

Protected Member Functions

 _is_running (self, name)
 

Detailed Description

Test case

Definition at line 14 of file test_collector.py.

Member Function Documentation

◆ _is_running()

_is_running ( self,
name )
protectedinherited
Check if a given program is still running.

Definition at line 84 of file test_support.py.

84 def _is_running(self, name):
85 """
86 Check if a given program is still running.
87 """
88 process = self.started_programs[name]
89 pid, sts = process._try_wait(os.WNOHANG)
90 assert pid == process.pid or pid == 0
91 return pid == 0
92

◆ assertHasOutputFile()

assertHasOutputFile ( self,
output_file,
unlink = True,
timeout = 0.5,
minimum_delay = 0.1 )
inherited
Assert that - at least after the given timeout - the output file is present. If unlink is set to True, remove the file after checking.

Definition at line 227 of file test_support.py.

227 def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1):
228 """
229 Assert that - at least after the given timeout - the output file
230 is present. If unlink is set to True, remove the file after checking.
231 """
232 endtime = time() + timeout
233
234 while True:
235 if os.path.exists(output_file):
236 if unlink:
237 os.unlink(output_file)
238 return
239
240 remaining = endtime - time()
241 self.assertFalse(remaining <= 0)
242
243 sleep(minimum_delay)
244

◆ assertIsAndGet()

assertIsAndGet ( self,
socket,
message_type,
final = True,
router = False )
inherited
Assert that the next message received on the socket has the given message type. If final is set to True, also assert that there is no additional message on the socket. Use router only for router sockets.

Definition at line 200 of file test_support.py.

200 def assertIsAndGet(self, socket, message_type, final=True, router=False):
201 """
202 Assert that the next message received on the socket has the given message type.
203 If final is set to True, also assert that there is no additional message on the socket.
204 Use router only for router sockets.
205 """
206 answer = HLTZMQTestCase.recv(socket)
207 type_index = 0
208 if router:
209 type_index = 1
210 self.assertEqual(answer[type_index], message_type.encode())
211 if final:
212 self.assertNothingMore(socket)
213 return answer
214

◆ assertIsDown()

assertIsDown ( self,
name,
timeout = 5,
minimum_delay = 0.1 )
inherited
Test helper to assert the given program has terminated - at least after timeout in seconds has passed. Checks every "minimal_delay seconds.

Definition at line 93 of file test_support.py.

93 def assertIsDown(self, name, timeout=5, minimum_delay=0.1):
94 """
95 Test helper to assert the given program has terminated - at least after timeout in seconds has passed.
96 Checks every "minimal_delay seconds.
97 """
98 endtime = time() + timeout
99 while True:
100 if not self._is_running(name):
101 return
102
103 remaining = endtime - time()
104 self.assertFalse(remaining <= 0)
105
106 sleep(minimum_delay)
107

◆ assertIsMsgType()

assertIsMsgType ( self,
socket,
message_type,
final = True,
router = False )
inherited
Deprecated copy of "assertIsAndGet".

Definition at line 215 of file test_support.py.

215 def assertIsMsgType(self, socket, message_type, final=True, router=False):
216 """
217 Deprecated copy of "assertIsAndGet".
218 """
219 return self.assertIsAndGet(socket, message_type, final=final, router=router)
220

◆ assertIsRunning()

assertIsRunning ( self,
name )
inherited
Assert that a given program is still running.

Definition at line 108 of file test_support.py.

108 def assertIsRunning(self, name):
109 """
110 Assert that a given program is still running.
111 """
112 self.assertTrue(self._is_running(name))
113

◆ assertMonitoring()

assertMonitoring ( self,
socket,
search_key,
search_value,
timeout = 10 )
inherited
Ask the given socket for a monitoring JSON and make sure, the value related to "search_key" is set to "search_value" - at least after the given timeout. The search key can should be in the form "<category>.<key>".

Definition at line 169 of file test_support.py.

169 def assertMonitoring(self, socket, search_key, search_value, timeout=10):
170 """
171 Ask the given socket for a monitoring JSON and make sure, the value related to "search_key"
172 is set to "search_value" - at least after the given timeout.
173 The search key can should be in the form "<category>.<key>".
174 """
175 end_time = time() + timeout
176 monitoring = dict()
177 while time() < end_time:
178 HLTZMQTestCase.send(socket, "m")
179 answer = self.assertIsAndGet(socket, "c")
180
181 dict_monitoring = json.loads(answer[1])
182 for parent_key, parent_dict in dict_monitoring.items():
183 for key, value in parent_dict.items():
184 monitoring[parent_key + "." + key] = value
185
186 if search_key in monitoring and monitoring[search_key] == search_value:
187 break
188 else:
189 if monitoring:
190 if search_key not in monitoring:
191 raise AssertionError(f"Monitoring did not have a result with key {search_key}")
192 else:
193 raise AssertionError(
194 f"Monitoring did not show the result {search_value} for {search_key}, instead {monitoring[search_key]}")
195 else:
196 raise AssertionError("Monitoring did not answer in time.")
197
198 self.assertNothingMore(socket)
199

◆ assertNotHasOutputFile()

assertNotHasOutputFile ( self,
output_file,
timeout = 0.5 )
inherited
Assert that after the timeout the given file is not present (a.k.a. no process has created it)

Definition at line 245 of file test_support.py.

245 def assertNotHasOutputFile(self, output_file, timeout=0.5):
246 """
247 Assert that after the timeout the given file is not present
248 (a.k.a. no process has created it)
249 """
250 sleep(timeout)
251 self.assertFalse(os.path.exists(output_file))
252
253

◆ assertNothingMore()

assertNothingMore ( self,
socket )
inherited
Assert that there is no pending message to be received on the socket.

Definition at line 221 of file test_support.py.

221 def assertNothingMore(self, socket):
222 """
223 Assert that there is no pending message to be received on the socket.
224 """
225 self.assertFalse(socket.poll(0))
226

◆ create_output_socket()

create_output_socket ( self)
inherited
create the output socket depending if final collector or not

Definition at line 283 of file test_support.py.

283 def create_output_socket(self):
284 """create the output socket depending if final collector or not"""
285 if self.final_collector:
286 output_socket = self.create_socket(self.output_port, socket_type=zmq.STREAM, bind=True)
287 output_socket.send(b"")
288 self.recv(output_socket)
289 else:
290 output_socket = self.create_socket(self.output_port)
291 self.send(output_socket, "r")
292 return output_socket
293

◆ create_router_socket()

create_router_socket ( port)
staticinherited
Shortcut to create a ROUTER type socket with the typical parameters binding to the given port.

Definition at line 140 of file test_support.py.

140 def create_router_socket(port):
141 """
142 Shortcut to create a ROUTER type socket with the typical parameters
143 binding to the given port.
144 """
145 return HLTZMQTestCase.create_socket(port, socket_type=zmq.ROUTER, identity="", bind=True)
146

◆ create_socket()

create_socket ( port,
socket_type = zmq.DEALER,
identity = "socket",
bind = False )
staticinherited
Create and return a ZMQ socket with the given type and identity and bind or connect it to localhost and the given port.

Definition at line 115 of file test_support.py.

115 def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False):
116 """
117 Create and return a ZMQ socket with the given type and identity and
118 bind or connect it to localhost and the given port.
119 """
120 socket = HLTZMQTestCase.ctx.socket(socket_type)
121 socket.rcvtimeo = 10000
122 socket.linger = 0
123 if identity:
124 socket.setsockopt_string(zmq.IDENTITY, identity)
125 if bind:
126 if port is None:
127 port = socket.bind_to_random_port("tcp://*")
128 return socket, port
129 else:
130 socket.bind(f"tcp://*:{port}")
131 else:
132 if port is None:
133 raise RuntimeError("Cannot connect to unknown port")
134
135 socket.connect(f"tcp://localhost:{port}")
136
137 return socket
138

◆ get_event()

get_event ( self,
output_socket )
inherited
get an event from the socket depending if final collector or not

Definition at line 301 of file test_support.py.

301 def get_event(self, output_socket):
302 """get an event from the socket depending if final collector or not"""
303 if self.final_collector:
304 self.recv(output_socket)
305 self.assertNothingMore(output_socket)
306 else:
307 self.assertIsMsgType(output_socket, "u")
308

◆ get_free_port()

get_free_port ( )
staticinherited
Get a free port number by reusing ZMQ's function for this.

Definition at line 41 of file test_support.py.

41 def get_free_port():
42 """
43 Get a free port number by reusing ZMQ's function for this.
44 """
45 socket = HLTZMQTestCase.ctx.socket(zmq.ROUTER)
46 port = socket.bind_to_random_port("tcp://*")
47 socket.close()
48 return port
49

◆ get_signal()

get_signal ( self,
output_socket,
signal_type )
inherited
get a signal from the socket depending if final collector or not

Definition at line 294 of file test_support.py.

294 def get_signal(self, output_socket, signal_type):
295 """get a signal from the socket depending if final collector or not"""
296 if self.final_collector:
297 self.assertNothingMore(output_socket)
298 else:
299 self.assertIsMsgType(output_socket, signal_type, final=True)
300

◆ recv()

recv ( socket)
staticinherited
Try to receive a message from the socket (or throw an assertion error if none comes after the set timeout of the socket).

Definition at line 159 of file test_support.py.

159 def recv(socket):
160 """
161 Try to receive a message from the socket (or throw an assertion error if none comes after the set timeout
162 of the socket).
163 """
164 try:
165 return socket.recv_multipart()
166 except zmq.error.Again:
167 raise AssertionError("No answer from socket")
168

◆ send()

send ( socket,
message_type,
first_data = b"",
second_data = b"",
identity = "" )
staticinherited
Send a message consisting of the message type, the first and the second data either to the identity if given or without identity if omitted.

Definition at line 148 of file test_support.py.

148 def send(socket, message_type, first_data=b"", second_data=b"", identity=""):
149 """
150 Send a message consisting of the message type, the first and the second data
151 either to the identity if given or without identity if omitted.
152 """
153 if identity:
154 socket.send_multipart([identity.encode(), message_type.encode(), first_data, second_data])
155 else:
156 socket.send_multipart([message_type.encode(), first_data, second_data])
157

◆ setUp()

setUp ( self)
inherited
Setup port numbers and necessary programs

Reimplemented from HLTZMQTestCase.

Definition at line 261 of file test_support.py.

261 def setUp(self):
262 """Setup port numbers and necessary programs"""
263
264 self.input_port = HLTZMQTestCase.get_free_port()
265
266 self.output_port = HLTZMQTestCase.get_free_port()
267
268 self.monitoring_port = HLTZMQTestCase.get_free_port()
269
270 command = "b2hlt_finalcollector" if self.final_collector else "b2hlt_collector"
271 output = "localhost" if self.final_collector else "*"
272 self.needed_programs = {
273 "collector": [
274 command,
275 "--input", f"tcp://*:{self.input_port}",
276 "--output", f"tcp://{output}:{self.output_port}",
277 "--monitor", f"tcp://*:{self.monitoring_port}"
278 ]
279 }
280 # programs are setup, call parent setup function now
281 super().setUp()
282

◆ tearDown()

tearDown ( self)
inherited
Custom tearDown function to kill the started programs if still present and remove the temporary folder again.

Definition at line 70 of file test_support.py.

70 def tearDown(self):
71 """
72 Custom tearDown function to kill the started programs if still present
73 and remove the temporary folder again.
74 """
75 for name, process in self.started_programs.items():
76 if self._is_running(name):
77 os.killpg(process.pid, signal.SIGKILL)
78 process.wait()
79 os.chdir(self.previous_dir)
80 shutil.rmtree(self.test_dir)
81
82 atexit._clear()
83

◆ testEventPropagation()

testEventPropagation ( self)
inherited
test function

Definition at line 473 of file test_support.py.

473 def testEventPropagation(self):
474 """test function"""
475 monitoring_socket = self.create_socket(self.monitoring_port)
476
477 # Send two ready messages from the first socket
478 output_socket = self.create_output_socket()
479 if not self.final_collector:
480 self.send(output_socket, "r")
481
482 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 2)
483 self.assertMonitoring(monitoring_socket, "output.registered_workers", 1)
484
485 if not self.final_collector:
486 # Send two ready message from the second socket (the last one is needed to keep the collector listening)
487 second_output_socket = self.create_socket(self.output_port, identity="other_socket")
488 self.send(second_output_socket, "r")
489 self.send(second_output_socket, "r")
490
491 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 4)
492 self.assertMonitoring(monitoring_socket, "output.registered_workers", 2)
493
494 # Register two workers
495 input_socket = self.create_socket(self.input_port)
496 self.send(input_socket, "h")
497 self.assertIsMsgType(input_socket, "c")
498 self.assertMonitoring(monitoring_socket, "input.registered_workers", 1)
499
500 second_input_socket = self.create_socket(self.input_port, identity="other_socket")
501 self.send(second_input_socket, "h")
502 self.assertIsMsgType(second_input_socket, "c")
503 self.assertMonitoring(monitoring_socket, "input.registered_workers", 2)
504
505 # The first event should go to the first worker
506 self.send(input_socket, "u", b"event data")
507 self.assertIsMsgType(input_socket, "c")
508
509 self.get_event(output_socket)
510 if not self.final_collector:
511 self.assertNothingMore(second_output_socket)
512
513 # The second also
514 self.send(second_input_socket, "u", b"event data")
515 self.assertIsMsgType(second_input_socket, "c")
516
517 self.get_event(output_socket)
518 if not self.final_collector:
519 self.assertNothingMore(second_output_socket)
520
521 # But the third to the second worker
522 self.send(input_socket, "u", b"event data")
523 self.assertIsMsgType(input_socket, "c")
524
525 if not self.final_collector:
526 self.assertNothingMore(output_socket)
527 self.get_event(second_output_socket)
528 else:
529 self.get_event(output_socket)
530
531 # A stop message should be sent to all workers
532 self.send(input_socket, "l")
533 self.assertIsMsgType(input_socket, "c")
534
535 # But only if it is complete...
536 self.assertNothingMore(output_socket)
537 if not self.final_collector:
538 self.assertNothingMore(second_output_socket)
539
540 self.send(second_input_socket, "l")
541 self.assertIsMsgType(second_input_socket, "c")
542
543 self.get_signal(output_socket, "l")
544 if not self.final_collector:
545 self.get_signal(second_output_socket, "l")
546
547 # TODO: Test and implement: should not transmit events after stop
548
549 # As well as a terminate message
550 self.send(input_socket, "x")
551 self.assertIsMsgType(input_socket, "c")
552
553 self.assertNothingMore(output_socket)
554 if not self.final_collector:
555 self.assertNothingMore(second_output_socket)
556
557 self.send(second_input_socket, "x")
558 self.assertIsMsgType(second_input_socket, "c")
559
560 self.get_signal(output_socket, "x")
561 if not self.final_collector:
562 self.get_signal(second_output_socket, "x")
563
564 self.assertIsDown("collector")

◆ testHelloAndMessageTransmission()

testHelloAndMessageTransmission ( self)
inherited
test function

Definition at line 309 of file test_support.py.

309 def testHelloAndMessageTransmission(self):
310 """test function"""
311 monitoring_socket = self.create_socket(self.monitoring_port)
312
313 # Register first worker
314 input_socket = self.create_socket(self.input_port)
315 self.send(input_socket, "h")
316
317 # However the collector is only polling for ready now...
318 self.assertNothingMore(input_socket)
319 self.assertMonitoring(monitoring_socket, "input.registered_workers", 0)
320
321 # To make it actually look for input messages, it needs at least a single worker
322 output_socket = self.create_output_socket()
323
324 # Now we can go on
325 self.assertIsMsgType(input_socket, "c")
326 self.assertMonitoring(monitoring_socket, "input.registered_workers", 1)
327
328 # Register second worker
329 second_input_socket = self.create_socket(self.input_port, identity="other_socket")
330 self.send(second_input_socket, "h")
331 self.assertIsMsgType(second_input_socket, "c")
332 self.assertMonitoring(monitoring_socket, "input.registered_workers", 2)
333
334 # So far no stop messages should be there
335 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
336 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
337
338 # the first stop message should not trigger a transmission
339 self.send(input_socket, "l")
340 self.assertIsMsgType(input_socket, "c")
341 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 1)
342 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
343 self.assertNothingMore(output_socket)
344
345 # The second stop message should
346 self.send(second_input_socket, "l")
347 self.assertIsMsgType(second_input_socket, "c")
348 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 2)
349 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", True)
350
351 self.get_signal(output_socket, "l")
352
353 # Another stop message should not change anything
354 self.send(input_socket, "l")
355 self.assertIsMsgType(input_socket, "c")
356 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 2)
357 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", True)
358 self.assertNothingMore(output_socket)
359
360 # But if we reset...
361 self.send(monitoring_socket, "n")
362 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
363 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
364
365 # .. it should
366 self.send(input_socket, "l")
367 self.assertIsMsgType(input_socket, "c")
368 self.send(second_input_socket, "l")
369 self.assertIsMsgType(second_input_socket, "c")
370
371 self.get_signal(output_socket, "l")
372
373 # Now we reset again
374 self.send(monitoring_socket, "n")
375 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
376 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
377
378 # send just one stop message
379 self.send(input_socket, "l")
380 self.assertIsMsgType(input_socket, "c")
381
382 # and unregister the second
383 self.send(second_input_socket, "d", b"other_socket")
384 self.assertIsMsgType(second_input_socket, "c")
385 self.assertMonitoring(monitoring_socket, "input.registered_workers", 1)
386
387 # which should also give us a stop message
388 self.get_signal(output_socket, "l")
389 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 1)
390 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", True)
391
392 # lets register and reset it again
393 self.send(second_input_socket, "h")
394 self.assertIsMsgType(second_input_socket, "c")
395 self.send(monitoring_socket, "n")
396 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
397 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
398 self.assertMonitoring(monitoring_socket, "input.registered_workers", 2)
399
400 # and try the other way round: first unregister, then send stop
401 self.send(second_input_socket, "d", b"other_socket")
402 self.assertIsMsgType(second_input_socket, "c")
403 self.send(input_socket, "l")
404 self.assertIsMsgType(input_socket, "c")
405 self.assertMonitoring(monitoring_socket, "input.registered_workers", 1)
406 # which should also give us a stop message
407 self.get_signal(output_socket, "l")
408 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 1)
409 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", True)
410
411 # reset the state
412 self.send(second_input_socket, "h")
413 self.assertIsMsgType(second_input_socket, "c")
414 self.send(monitoring_socket, "n")
415 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
416 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
417 self.assertMonitoring(monitoring_socket, "input.registered_workers", 2)
418
419 # The same applies to terminate messages:
420 # Nothing at the beginning
421 self.assertMonitoring(monitoring_socket, "input.received_terminate_messages", 0)
422 self.assertMonitoring(monitoring_socket, "input.all_terminate_messages", False)
423
424 # the first terminate message should not trigger a transmission
425 self.send(input_socket, "x")
426 self.assertIsMsgType(input_socket, "c")
427 self.assertMonitoring(monitoring_socket, "input.received_terminate_messages", 1)
428 self.assertMonitoring(monitoring_socket, "input.all_terminate_messages", False)
429 self.assertNothingMore(output_socket)
430
431 # Another terminate message should not change anything
432 self.send(input_socket, "x")
433 self.assertIsMsgType(input_socket, "c")
434 self.assertMonitoring(monitoring_socket, "input.received_terminate_messages", 1)
435 self.assertMonitoring(monitoring_socket, "input.all_terminate_messages", False)
436 self.assertNothingMore(output_socket)
437
438 # But if we reset...
439 self.send(monitoring_socket, "n")
440 self.assertMonitoring(monitoring_socket, "input.received_terminate_messages", 0)
441 self.assertMonitoring(monitoring_socket, "input.all_terminate_messages", False)
442
443 # ... and send again ...
444 self.send(input_socket, "x")
445 self.assertIsMsgType(input_socket, "c")
446 self.send(second_input_socket, "x")
447 self.assertIsMsgType(second_input_socket, "c")
448
449 self.get_signal(output_socket, "x")
450
451 # ... and the collector should have terminated
452 self.assertIsDown("collector")
453
454 # TODO: test and implement: timeout in wait for stop or terminate messages
455

◆ testWrongRegistration()

testWrongRegistration ( self)
inherited
test function

Definition at line 456 of file test_support.py.

456 def testWrongRegistration(self):
457 """test function"""
458 # To make it actually look for input messages, it needs at least a single worker
459 self.create_output_socket()
460
461 # Register first worker
462 input_socket = self.create_socket(self.input_port)
463 self.send(input_socket, "h")
464 self.assertIsMsgType(input_socket, "c")
465
466 # Send with a second, unregistered worker
467 second_input_socket = self.create_socket(self.input_port, identity="other_socket")
468 self.send(second_input_socket, "l")
469
470 # The collector should die
471 self.assertIsDown("collector")
472

Member Data Documentation

◆ ctx

ctx = zmq.Context()
staticinherited

The ZMQ context.

Definition at line 36 of file test_support.py.

◆ final_collector

bool final_collector = False
staticinherited

final_collector

Definition at line 259 of file test_support.py.

◆ input_port

input_port = HLTZMQTestCase.get_free_port()
inherited

input_port

Definition at line 264 of file test_support.py.

◆ monitoring_port

monitoring_port = HLTZMQTestCase.get_free_port()
inherited

monitoring_port

Definition at line 268 of file test_support.py.

◆ needed_programs

needed_programs = dict()
staticinherited

The dict name -> cmd args of the programs to start, needs to be set in each test.

Definition at line 38 of file test_support.py.

◆ output_port

output_port = HLTZMQTestCase.get_free_port()
inherited

output_port

Definition at line 266 of file test_support.py.

◆ previous_dir

previous_dir = os.getcwd()
inherited

remember current working directory

Definition at line 58 of file test_support.py.

◆ started_programs

started_programs = dict()
inherited

dict for all started programs

Definition at line 62 of file test_support.py.

◆ tearDown

tearDown = subprocess.Popen(command, start_new_session=True)
inherited

Definition at line 68 of file test_support.py.

◆ test_dir

test_dir = tempfile.mkdtemp()
inherited

use a temporary folder for testing

Definition at line 56 of file test_support.py.


The documentation for this class was generated from the following file: