Belle II Software  release-05-01-25
test_worker.py
1 import atexit
2 import os
3 from pathlib import Path
4 from time import sleep
5 from unittest import main
6 import basf2
7 
8 from zmq_daq.test_support import HLTZMQTestCase
9 
10 
12  """Test case baseclass to spawn a worker"""
13 
14  event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read()
15 
16 
17  extra_arguments = []
18 
19  def setUp(self):
20  """Setup necessary sockets and programs"""
21 
22  self.input_socket, input_port = self.create_router_socket(None)
23 
24  self.output_socket, output_port = self.create_router_socket(None)
25  # and set the list of necessary programs to use these sockets
26  self.needed_programs = {
27  "worker": [
28  "python3", basf2.find_file("daq/hbasf2/tests/passthrough.no_run_py"),
29  "--input", f"tcp://localhost:{input_port}",
30  "--output", f"tcp://localhost:{output_port}",
31  ] + self.extra_arguments
32  }
33  super().setUp()
34 
35  def start(self):
36  """start the needed sockets and send some hello messages"""
37  # There should be a hello message
38 
39  self.output_identity = self.assertIsMsgType(self.output_socket, "h", router=True)[0].decode()
40  self.send(self.output_socket, "c", identity=self.output_identity)
41 
42  # There are probably many more ready messages, but we are only interested in at least one here
43 
44  self.input_identity = self.assertIsMsgType(self.input_socket, "r", router=True, final=False)[0].decode()
45 
46  # Store some example events
47 
49 
51 
52 
54  """Tests for normal worker behavior"""
55  def testInitialization(self):
56  """test function"""
57  self.start()
58 
59  # Initialisation should be called
60  self.assertHasOutputFile("initialize_called", timeout=1)
61 
62  def testRunSending(self):
63  """test function"""
64  self.start()
65 
66  # Send first event (should trigger begin run again)
67  self.send(self.input_socket, "u", first_data=self.first_run_event_data[0], identity=self.input_identity)
68  self.assertHasOutputFile("beginrun_called", timeout=0.5)
69  self.assertIsMsgType(self.output_socket, "w", router=True)
70  self.send(self.output_socket, "c", identity=self.output_identity)
71 
72  # Send second event (should not trigger begin run again)
73  self.send(self.input_socket, "u", first_data=self.first_run_event_data[1], identity=self.input_identity)
74  self.assertNotHasOutputFile("beginrun_called", timeout=0.5)
75  self.assertIsMsgType(self.output_socket, "w", router=True)
76  self.send(self.output_socket, "c", identity=self.output_identity)
77 
78  def testEndRun(self):
79  """test function"""
80  self.start()
81 
82  # TODO: do I want to test which run was ended?
83  # end run trigger
84  self.send(self.input_socket, "l", identity=self.input_identity)
85  self.assertHasOutputFile("endrun_called", timeout=2)
86  self.assertIsMsgType(self.output_socket, "l", router=True)
87  self.send(self.output_socket, "c", identity=self.output_identity)
88 
89  # Also the second one should give us an end run
90  self.send(self.input_socket, "l", identity=self.input_identity)
91  self.assertHasOutputFile("endrun_called", timeout=1)
92  self.assertIsMsgType(self.output_socket, "l", router=True)
93  self.send(self.output_socket, "c", identity=self.output_identity)
94 
95  # Sneak in an event in between -> should give beginRun
96  self.send(self.input_socket, "u", first_data=self.first_run_event_data[0], identity=self.input_identity)
97  self.assertHasOutputFile("beginrun_called", timeout=1)
98  self.assertIsMsgType(self.output_socket, "w", router=True)
99  self.send(self.output_socket, "c", identity=self.output_identity)
100 
101  # And end the run again
102  self.send(self.input_socket, "l", identity=self.input_identity)
103  self.assertHasOutputFile("endrun_called", timeout=1)
104  self.assertIsMsgType(self.output_socket, "l", router=True)
105  self.send(self.output_socket, "c", identity=self.output_identity)
106 
107  # A second time...
108  self.send(self.input_socket, "l", identity=self.input_identity)
109  self.assertHasOutputFile("endrun_called", timeout=1)
110  self.assertIsMsgType(self.output_socket, "l", router=True)
111  self.send(self.output_socket, "c", identity=self.output_identity)
112 
113  # Sneak in a second event in between -> should give beginRun (as it is a new run)
114  self.send(self.input_socket, "u", first_data=self.second_run_event_data[0], identity=self.input_identity)
115  self.assertHasOutputFile("beginrun_called", timeout=1)
116  self.assertIsMsgType(self.output_socket, "w", router=True)
117  self.send(self.output_socket, "c", identity=self.output_identity)
118 
119  # And end the run again
120  self.send(self.input_socket, "l", identity=self.input_identity)
121  self.assertHasOutputFile("endrun_called", timeout=1)
122  self.assertIsMsgType(self.output_socket, "l", router=True)
123  self.send(self.output_socket, "c", identity=self.output_identity)
124 
125  # Termination should also work
126  self.send(self.input_socket, "x", identity=self.input_identity)
127  # Attention: terminate is called in the different order
128  self.assertIsMsgType(self.output_socket, "x", router=True)
129  self.send(self.output_socket, "c", identity=self.output_identity)
130  self.assertHasOutputFile("terminate_called", timeout=1)
131 
132  # And the termination should cause the process to go down
133  self.assertIsDown("worker", timeout=200)
134 
135 
137  """Test case for dying workers"""
138 
139 
140  extra_arguments = ["--exit", "--prefix", "dying_"]
141 
143  """test function"""
144  self.start()
145 
146  # lets send some events
147  for _ in range(10):
148  self.send(self.input_socket, "u", first_data=self.event_data, identity=self.input_identity)
149  self.assertIsMsgType(self.output_socket, "w", router=True)
150  self.send(self.output_socket, "c", identity=self.output_identity)
151 
152  self.assertHasOutputFile("dying_beginrun_called", timeout=1)
153 
154  # Now we kill one of the workers
155  Path("dying_exit_request").touch()
156 
157  self.send(self.input_socket, "u", first_data=self.event_data, identity=self.input_identity)
158  self.assertHasOutputFile("dying_exit_called", timeout=1)
159  msg = self.assertIsMsgType(self.output_socket, "d", router=True)
160  # the message content should be the worker that has died
161  self.assertEqual(msg[2].decode(), self.output_identity)
162  self.send(self.output_socket, "c", identity=msg[0].decode())
163 
164  self.assertIsDown("worker", timeout=10)
165 
166 
167 if __name__ == '__main__':
168  main()
test_worker.DyingWorkerTestCase.testUnregistration
def testUnregistration(self)
Definition: test_worker.py:142
zmq_daq.test_support.HLTZMQTestCase
Definition: test_support.py:17
test_worker.NormalWorkerTestCase.testInitialization
def testInitialization(self)
Definition: test_worker.py:55
zmq_daq.test_support.HLTZMQTestCase.needed_programs
needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
Definition: test_support.py:26
zmq_daq.test_support.HLTZMQTestCase.assertIsDown
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)
Definition: test_support.py:81
test_worker.WorkerTestCase.first_run_event_data
first_run_event_data
some data
Definition: test_worker.py:48
zmq_daq.test_support.HLTZMQTestCase.assertIsMsgType
def assertIsMsgType(self, socket, message_type, final=True, router=False)
Definition: test_support.py:203
test_worker.WorkerTestCase.output_identity
output_identity
output_identity
Definition: test_worker.py:39
zmq_daq.test_support.HLTZMQTestCase.assertNotHasOutputFile
def assertNotHasOutputFile(self, output_file, timeout=0.5)
Definition: test_support.py:233
test_worker.WorkerTestCase.extra_arguments
list extra_arguments
extra arguments to pass to the worker script
Definition: test_worker.py:17
test_worker.NormalWorkerTestCase
Definition: test_worker.py:53
test_worker.WorkerTestCase.event_data
event_data
event_data
Definition: test_worker.py:14
zmq_daq.test_support.HLTZMQTestCase.assertHasOutputFile
def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
Definition: test_support.py:215
test_worker.WorkerTestCase.input_identity
input_identity
input_identity
Definition: test_worker.py:44
main
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:77
test_worker.WorkerTestCase.start
def start(self)
Definition: test_worker.py:35
zmq_daq.test_support.HLTZMQTestCase.send
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
Definition: test_support.py:136
test_worker.NormalWorkerTestCase.testRunSending
def testRunSending(self)
Definition: test_worker.py:62
zmq_daq.test_support
Definition: test_support.py:1
test_worker.WorkerTestCase
Definition: test_worker.py:11
test_worker.NormalWorkerTestCase.testEndRun
def testEndRun(self)
Definition: test_worker.py:78
test_worker.WorkerTestCase.setUp
def setUp(self)
Definition: test_worker.py:19
test_worker.DyingWorkerTestCase
Definition: test_worker.py:136
zmq_daq.test_support.HLTZMQTestCase.create_router_socket
def create_router_socket(port)
Definition: test_support.py:128
test_worker.WorkerTestCase.second_run_event_data
second_run_event_data
some data
Definition: test_worker.py:50