Belle II Software development
test_worker.py
1
8
9from pathlib import Path
10from unittest import main
11import basf2
12
13from zmq_daq.test_support import HLTZMQTestCase, ZMQ_TEST_FOR_LOOPS, ZMQ_TEST_MAX_FAILURES
14
15
17 """Test case baseclass to spawn a worker"""
18
19 event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read()
20
21
22 extra_arguments = []
23
24 def setUp(self):
25 """Setup necessary sockets and programs"""
26
27 self.input_socket, input_port = self.create_router_socket(None)
28
29 self.output_socket, output_port = self.create_router_socket(None)
30 # and set the list of necessary programs to use these sockets
32 "worker": [
33 "python3", basf2.find_file("daq/hbasf2/tests/passthrough.no_run_py"),
34 "--input", f"tcp://localhost:{input_port}",
35 "--output", f"tcp://localhost:{output_port}",
36 ] + self.extra_arguments
37 }
38 super().setUp()
39
40 def start(self):
41 """start the needed sockets and send some hello messages"""
42 # There should be a hello message
43
44 self.output_identity = self.assertIsMsgType(self.output_socket, "h", router=True)[0].decode()
45 self.send(self.output_socket, "c", identity=self.output_identity)
46
47 # There are probably many more ready messages, but we are only interested in at least one here
48
49 self.input_identity = self.assertIsMsgType(self.input_socket, "r", router=True, final=False)[0].decode()
50
51 # Store some example events
52
54
56
57
59 """Tests for normal worker behavior"""
60
62 """test function"""
63 self.start()
64
65 # Initialisation should be called
66 self.assertHasOutputFile("initialize_called", timeout=1)
67
68 def testRunSending(self):
69 """test function"""
70 self.start()
71
72 # Send first event (should trigger begin run again)
73 self.send(self.input_socket, "u", first_data=self.first_run_event_data[0], identity=self.input_identity)
74 self.assertHasOutputFile("beginrun_called", timeout=0.5)
75 self.assertIsMsgType(self.output_socket, "w", router=True)
76 self.send(self.output_socket, "c", identity=self.output_identity)
77
78 # Send second event (should not trigger begin run again)
79 self.send(self.input_socket, "u", first_data=self.first_run_event_data[1], identity=self.input_identity)
80 self.assertNotHasOutputFile("beginrun_called", timeout=0.5)
81 self.assertIsMsgType(self.output_socket, "w", router=True)
82 self.send(self.output_socket, "c", identity=self.output_identity)
83
84 def testEndRun(self):
85 """test function"""
86 self.start()
87
88 # TODO: do I want to test which run was ended?
89 # end run trigger
90 self.send(self.input_socket, "l", identity=self.input_identity)
91 self.assertHasOutputFile("endrun_called", timeout=2)
92 self.assertIsMsgType(self.output_socket, "l", router=True)
93 self.send(self.output_socket, "c", identity=self.output_identity)
94
95 # Also the second one should give us an end run
96 self.send(self.input_socket, "l", identity=self.input_identity)
97 self.assertHasOutputFile("endrun_called", timeout=1)
98 self.assertIsMsgType(self.output_socket, "l", router=True)
99 self.send(self.output_socket, "c", identity=self.output_identity)
100
101 # Sneak in an event in between -> should give beginRun
102 self.send(self.input_socket, "u", first_data=self.first_run_event_data[0], identity=self.input_identity)
103 self.assertHasOutputFile("beginrun_called", timeout=1)
104 self.assertIsMsgType(self.output_socket, "w", router=True)
105 self.send(self.output_socket, "c", identity=self.output_identity)
106
107 # And end the run again
108 self.send(self.input_socket, "l", identity=self.input_identity)
109 self.assertHasOutputFile("endrun_called", timeout=1)
110 self.assertIsMsgType(self.output_socket, "l", router=True)
111 self.send(self.output_socket, "c", identity=self.output_identity)
112
113 # A second time...
114 self.send(self.input_socket, "l", identity=self.input_identity)
115 self.assertHasOutputFile("endrun_called", timeout=1)
116 self.assertIsMsgType(self.output_socket, "l", router=True)
117 self.send(self.output_socket, "c", identity=self.output_identity)
118
119 # Sneak in a second event in between -> should give beginRun (as it is a new run)
120 self.send(self.input_socket, "u", first_data=self.second_run_event_data[0], identity=self.input_identity)
121 self.assertHasOutputFile("beginrun_called", timeout=1)
122 self.assertIsMsgType(self.output_socket, "w", router=True)
123 self.send(self.output_socket, "c", identity=self.output_identity)
124
125 # And end the run again
126 self.send(self.input_socket, "l", identity=self.input_identity)
127 self.assertHasOutputFile("endrun_called", timeout=1)
128 self.assertIsMsgType(self.output_socket, "l", router=True)
129 self.send(self.output_socket, "c", identity=self.output_identity)
130
131 # Termination should also work
132 self.send(self.input_socket, "x", identity=self.input_identity)
133 # Attention: terminate is called in the different order
134 self.assertIsMsgType(self.output_socket, "x", router=True)
135 self.send(self.output_socket, "c", identity=self.output_identity)
136 self.assertHasOutputFile("terminate_called", timeout=1)
137
138 # And the termination should cause the process to go down
139 self.assertIsDown("worker", timeout=200)
140
141
143 """Test case for dying workers"""
144
145
146 extra_arguments = ["--exit", "--prefix", "dying_"]
147
149 """test function"""
150 self.start()
151
152 # lets send some events
153 for _ in range(10):
154 self.send(self.input_socket, "u", first_data=self.event_data, identity=self.input_identity)
155 self.assertIsMsgType(self.output_socket, "w", router=True)
156 self.send(self.output_socket, "c", identity=self.output_identity)
157
158 self.assertHasOutputFile("dying_beginrun_called", timeout=1)
159
160 # Now we kill one of the workers
161 Path("dying_exit_request").touch()
162
163 self.send(self.input_socket, "u", first_data=self.event_data, identity=self.input_identity)
164 self.assertHasOutputFile("dying_exit_called", timeout=1)
165 msg = self.assertIsMsgType(self.output_socket, "d", router=True)
166 # the message content should be the worker that has died
167 self.assertEqual(msg[2].decode(), self.output_identity)
168 self.send(self.output_socket, "c", identity=msg[0].decode())
169
170 self.assertIsDown("worker", timeout=10)
171
172
173if __name__ == '__main__':
174
175 number_of_failures = 0
176
177 for i in range(ZMQ_TEST_FOR_LOOPS):
178 try:
179 main(exit=False)
180 except AssertionError:
181 number_of_failures += 1
182
183
184 message = f'Number of failed for loops: {number_of_failures}/{ZMQ_TEST_FOR_LOOPS}'
185 if number_of_failures <= ZMQ_TEST_MAX_FAILURES:
186 basf2.B2INFO(message)
187 else:
188 basf2.B2FATAL(message)
list extra_arguments
extra arguments to pass to the worker script
Definition: test_worker.py:22
output_identity
output_identity
Definition: test_worker.py:44
input_identity
input_identity
Definition: test_worker.py:49
open event_data
event_data
Definition: test_worker.py:19
def assertIsMsgType(self, socket, message_type, final=True, router=False)
dict needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
Definition: test_support.py:38
def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
def assertNotHasOutputFile(self, output_file, timeout=0.5)
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)
Definition: test_support.py:93
Definition: main.py:1