Belle II Software
release-08-01-10
|
Public Member Functions | |
def | setUp (self) |
def | tearDown (self) |
def | assertIsDown (self, name, timeout=5, minimum_delay=0.1) |
def | assertIsRunning (self, name) |
def | assertMonitoring (self, socket, search_key, search_value, timeout=10) |
def | assertIsAndGet (self, socket, message_type, final=True, router=False) |
def | assertIsMsgType (self, socket, message_type, final=True, router=False) |
def | assertNothingMore (self, socket) |
def | assertHasOutputFile (self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1) |
def | assertNotHasOutputFile (self, output_file, timeout=0.5) |
Static Public Member Functions | |
def | get_free_port () |
def | create_socket (port, socket_type=zmq.DEALER, identity="socket", bind=False) |
def | create_router_socket (port) |
def | send (socket, message_type, first_data=b"", second_data=b"", identity="") |
def | recv (socket) |
Public Attributes | |
test_dir | |
use a temporary folder for testing | |
previous_dir | |
remember current working directory | |
started_programs | |
dict for all started programs | |
Static Public Attributes | |
ctx = zmq.Context() | |
The ZMQ context. | |
needed_programs = dict() | |
The dict name -> cmd args of the programs to start, needs to be set in each test. | |
Private Member Functions | |
def | _is_running (self, name) |
Base class for all HLT ZMQ tests helping to start the needed programs, create ZMQ sockets and send/receive messages easily. Has also some functionality for asserting test-correctness
Definition at line 29 of file test_support.py.
|
private |
Check if a given program is still running.
Definition at line 84 of file test_support.py.
def assertHasOutputFile | ( | self, | |
output_file, | |||
unlink = True , |
|||
timeout = 0.5 , |
|||
minimum_delay = 0.1 |
|||
) |
Assert that - at least after the given timeout - the output file is present. If unlink is set to True, remove the file after checking.
Definition at line 227 of file test_support.py.
def assertIsAndGet | ( | self, | |
socket, | |||
message_type, | |||
final = True , |
|||
router = False |
|||
) |
Assert that the next message received on the socket has the given message type. If final is set to True, also assert that there is no additional message on the socket. Use router only for router sockets.
Definition at line 200 of file test_support.py.
def assertIsDown | ( | self, | |
name, | |||
timeout = 5 , |
|||
minimum_delay = 0.1 |
|||
) |
Test helper to assert the given program has terminated - at least after timeout in seconds has passed. Checks every "minimal_delay seconds.
Definition at line 93 of file test_support.py.
def assertIsMsgType | ( | self, | |
socket, | |||
message_type, | |||
final = True , |
|||
router = False |
|||
) |
Deprecated copy of "assertIsAndGet".
Definition at line 215 of file test_support.py.
def assertIsRunning | ( | self, | |
name | |||
) |
Assert that a given program is still running.
Definition at line 108 of file test_support.py.
def assertMonitoring | ( | self, | |
socket, | |||
search_key, | |||
search_value, | |||
timeout = 10 |
|||
) |
Ask the given socket for a monitoring JSON and make sure, the value related to "search_key" is set to "search_value" - at least after the given timeout. The search key can should be in the form "<category>.<key>".
Definition at line 169 of file test_support.py.
def assertNotHasOutputFile | ( | self, | |
output_file, | |||
timeout = 0.5 |
|||
) |
Assert that after the timeout the given file is not present (a.k.a. no process has created it)
Definition at line 245 of file test_support.py.
def assertNothingMore | ( | self, | |
socket | |||
) |
Assert that there is no pending message to be received on the socket.
Definition at line 221 of file test_support.py.
|
static |
Shortcut to create a ROUTER type socket with the typical parameters binding to the given port.
Definition at line 140 of file test_support.py.
|
static |
Create and return a ZMQ socket with the given type and identity and bind or connect it to localhost and the given port.
Definition at line 115 of file test_support.py.
|
static |
Get a free port number by reusing ZMQ's function for this.
Definition at line 41 of file test_support.py.
|
static |
Try to receive a message from the socket (or throw an assertion error if none comes after the set timeout of the socket).
Definition at line 159 of file test_support.py.
|
static |
Send a message consisting of the message type, the first and the second data either to the identity if given or without identity if omitted.
Definition at line 148 of file test_support.py.
def setUp | ( | self | ) |
Custom setUp function to go into a temporary folder and start the needed programs.
Reimplemented in WorkerTestCase, DyingHLTTestCase, HLTTestCase, HistogramTestCase, DistributorTestCase, and BaseCollectorTestCase.
Definition at line 50 of file test_support.py.
def tearDown | ( | self | ) |
Custom tearDown function to kill the started programs if still present and remove the temporary folder again.
Definition at line 70 of file test_support.py.