9 from pathlib
import Path
10 from unittest
import main
17 """Test case baseclass to spawn a worker"""
19 event_data = open(basf2.find_file(
"daq/hbasf2/tests/out.raw"),
"br").read()
25 """Setup necessary sockets and programs"""
33 "python3", basf2.find_file(
"daq/hbasf2/tests/passthrough.no_run_py"),
34 "--input", f
"tcp://localhost:{input_port}",
35 "--output", f
"tcp://localhost:{output_port}",
41 """start the needed sockets and send some hello messages"""
59 """Tests for normal worker behavior"""
75 self.
assertIsMsgTypeassertIsMsgType(self.output_socket,
"w", router=
True)
81 self.
assertIsMsgTypeassertIsMsgType(self.output_socket,
"w", router=
True)
92 self.
assertIsMsgTypeassertIsMsgType(self.output_socket,
"l", router=
True)
98 self.
assertIsMsgTypeassertIsMsgType(self.output_socket,
"l", router=
True)
104 self.
assertIsMsgTypeassertIsMsgType(self.output_socket,
"w", router=
True)
110 self.
assertIsMsgTypeassertIsMsgType(self.output_socket,
"l", router=
True)
116 self.
assertIsMsgTypeassertIsMsgType(self.output_socket,
"l", router=
True)
122 self.
assertIsMsgTypeassertIsMsgType(self.output_socket,
"w", router=
True)
128 self.
assertIsMsgTypeassertIsMsgType(self.output_socket,
"l", router=
True)
134 self.
assertIsMsgTypeassertIsMsgType(self.output_socket,
"x", router=
True)
143 """Test case for dying workers"""
146 extra_arguments = [
"--exit",
"--prefix",
"dying_"]
155 self.
assertIsMsgTypeassertIsMsgType(self.output_socket,
"w", router=
True)
161 Path(
"dying_exit_request").touch()
165 msg = self.
assertIsMsgTypeassertIsMsgType(self.output_socket,
"d", router=
True)
168 self.
sendsend(self.output_socket,
"c", identity=msg[0].decode())
173 if __name__ ==
'__main__':
175 number_of_failures = 0
177 for i
in range(ZMQ_TEST_FOR_LOOPS):
180 except AssertionError:
181 number_of_failures += 1
184 message = f
'Number of failed for loops: {number_of_failures}/{ZMQ_TEST_FOR_LOOPS}'
185 if number_of_failures <= ZMQ_TEST_MAX_FAILURES:
186 basf2.B2INFO(message)
188 basf2.B2FATAL(message)
def testUnregistration(self)
def testInitialization(self)
list extra_arguments
extra arguments to pass to the worker script
output_identity
output_identity
needed_programs
input_socket
first_run_event_data
some data
input_identity
input_identity
second_run_event_data
some data
def assertIsMsgType(self, socket, message_type, final=True, router=False)
needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
def create_router_socket(port)
def assertNotHasOutputFile(self, output_file, timeout=0.5)
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)