|
def | testUnregistration (self) |
|
def | setUp (self) |
|
def | start (self) |
|
def | tearDown (self) |
|
def | assertIsDown (self, name, timeout=5, minimum_delay=0.1) |
|
def | assertIsRunning (self, name) |
|
def | assertMonitoring (self, socket, search_key, search_value, timeout=10) |
|
def | assertIsAndGet (self, socket, message_type, final=True, router=False) |
|
def | assertIsMsgType (self, socket, message_type, final=True, router=False) |
|
def | assertNothingMore (self, socket) |
|
def | assertHasOutputFile (self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1) |
|
def | assertNotHasOutputFile (self, output_file, timeout=0.5) |
|
|
list | extra_arguments = ["--exit", "--prefix", "dying_"] |
| set the extra arguments we need ...
|
|
| event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read() |
| event_data
|
|
| ctx = zmq.Context() |
| The ZMQ context.
|
|
Test case for dying workers
Definition at line 136 of file test_worker.py.
◆ _is_running()
def _is_running |
( |
|
self, |
|
|
|
name |
|
) |
| |
|
privateinherited |
Check if a given program is still running.
Definition at line 72 of file test_support.py.
◆ assertHasOutputFile()
def assertHasOutputFile |
( |
|
self, |
|
|
|
output_file, |
|
|
|
unlink = True , |
|
|
|
timeout = 0.5 , |
|
|
|
minimum_delay = 0.1 |
|
) |
| |
|
inherited |
Assert that - at least after the given timeout - the output file
is present. If unlink is set to True, remove the file after checking.
Definition at line 215 of file test_support.py.
◆ assertIsAndGet()
def assertIsAndGet |
( |
|
self, |
|
|
|
socket, |
|
|
|
message_type, |
|
|
|
final = True , |
|
|
|
router = False |
|
) |
| |
|
inherited |
Assert that the next message received on the socket has the given message type.
If final is set to True, also assert that there is no additional message on the socket.
Use router only for router sockets.
Definition at line 188 of file test_support.py.
◆ assertIsDown()
def assertIsDown |
( |
|
self, |
|
|
|
name, |
|
|
|
timeout = 5 , |
|
|
|
minimum_delay = 0.1 |
|
) |
| |
|
inherited |
Test helper to assert the given program has terminated - at least after timeout in seconds has passed.
Checks every "minimal_delay seconds.
Definition at line 81 of file test_support.py.
◆ assertIsMsgType()
def assertIsMsgType |
( |
|
self, |
|
|
|
socket, |
|
|
|
message_type, |
|
|
|
final = True , |
|
|
|
router = False |
|
) |
| |
|
inherited |
◆ assertIsRunning()
def assertIsRunning |
( |
|
self, |
|
|
|
name |
|
) |
| |
|
inherited |
Assert that a given program is still running.
Definition at line 96 of file test_support.py.
◆ assertMonitoring()
def assertMonitoring |
( |
|
self, |
|
|
|
socket, |
|
|
|
search_key, |
|
|
|
search_value, |
|
|
|
timeout = 10 |
|
) |
| |
|
inherited |
Ask the given socket for a monitoring JSON and make sure, the value related to "search_key"
is set to "search_value" - at least after the given timeout.
The search key can should be in the form "<category>.<key>".
Definition at line 157 of file test_support.py.
◆ assertNotHasOutputFile()
def assertNotHasOutputFile |
( |
|
self, |
|
|
|
output_file, |
|
|
|
timeout = 0.5 |
|
) |
| |
|
inherited |
Assert that after the timeout the given file is not present
(a.k.a. no process has created it)
Definition at line 233 of file test_support.py.
◆ assertNothingMore()
def assertNothingMore |
( |
|
self, |
|
|
|
socket |
|
) |
| |
|
inherited |
Assert that there is no pending message to be received on the socket.
Definition at line 209 of file test_support.py.
◆ create_router_socket()
def create_router_socket |
( |
|
port | ) |
|
|
staticinherited |
Shortcut to create a ROUTER type socket with the typical parameters
binding to the given port.
Definition at line 128 of file test_support.py.
◆ create_socket()
def create_socket |
( |
|
port, |
|
|
|
socket_type = zmq.DEALER , |
|
|
|
identity = "socket" , |
|
|
|
bind = False |
|
) |
| |
|
staticinherited |
Create and return a ZMQ socket with the given type and identity and
bind or connect it to localhost and the given port.
Definition at line 103 of file test_support.py.
◆ get_free_port()
Get a free port number by reusing ZMQ's function for this.
Definition at line 29 of file test_support.py.
◆ recv()
Try to receive a message from the socket (or throw an assertion error if none comes after the set timeout
of the socket).
Definition at line 147 of file test_support.py.
◆ send()
def send |
( |
|
socket, |
|
|
|
message_type, |
|
|
|
first_data = b"" , |
|
|
|
second_data = b"" , |
|
|
|
identity = "" |
|
) |
| |
|
staticinherited |
Send a message consisting of the message type, the first and the second data
either to the identity if given or without identity if omitted.
Definition at line 136 of file test_support.py.
◆ setUp()
Setup necessary sockets and programs
Reimplemented from HLTZMQTestCase.
Definition at line 19 of file test_worker.py.
20 """Setup necessary sockets and programs"""
22 self.input_socket, input_port = self.create_router_socket(
None)
24 self.output_socket, output_port = self.create_router_socket(
None)
26 self.needed_programs = {
28 "python3", basf2.find_file(
"daq/hbasf2/tests/passthrough.no_run_py"),
29 "--input", f
"tcp://localhost:{input_port}",
30 "--output", f
"tcp://localhost:{output_port}",
31 ] + self.extra_arguments
◆ start()
start the needed sockets and send some hello messages
Definition at line 35 of file test_worker.py.
◆ tearDown()
Custom tearDown function to kill the started programs if still present
and remove the temporary folder again.
Definition at line 58 of file test_support.py.
◆ testUnregistration()
def testUnregistration |
( |
|
self | ) |
|
◆ needed_programs
The documentation for this class was generated from the following file: