Belle II Software  release-08-01-10
test_worker.py
1 
8 
9 from pathlib import Path
10 from unittest import main
11 import basf2
12 
13 from zmq_daq.test_support import HLTZMQTestCase, ZMQ_TEST_FOR_LOOPS, ZMQ_TEST_MAX_FAILURES
14 
15 
17  """Test case baseclass to spawn a worker"""
18 
19  event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read()
20 
21 
22  extra_arguments = []
23 
24  def setUp(self):
25  """Setup necessary sockets and programs"""
26 
27  self.input_socket, input_port = self.create_router_socketcreate_router_socket(None)
28 
29  self.output_socket, output_port = self.create_router_socketcreate_router_socket(None)
30  # and set the list of necessary programs to use these sockets
31  self.needed_programsneeded_programsneeded_programs = {
32  "worker": [
33  "python3", basf2.find_file("daq/hbasf2/tests/passthrough.no_run_py"),
34  "--input", f"tcp://localhost:{input_port}",
35  "--output", f"tcp://localhost:{output_port}",
36  ] + self.extra_argumentsextra_arguments
37  }
38  super().setUp()
39 
40  def start(self):
41  """start the needed sockets and send some hello messages"""
42  # There should be a hello message
43 
44  self.output_identityoutput_identity = self.assertIsMsgTypeassertIsMsgType(self.output_socket, "h", router=True)[0].decode()
45  self.sendsend(self.output_socket, "c", identity=self.output_identityoutput_identity)
46 
47  # There are probably many more ready messages, but we are only interested in at least one here
48 
49  self.input_identityinput_identity = self.assertIsMsgTypeassertIsMsgType(self.input_socket, "r", router=True, final=False)[0].decode()
50 
51  # Store some example events
52 
53  self.first_run_event_datafirst_run_event_data = [self.event_dataevent_data, self.event_dataevent_data]
54 
55  self.second_run_event_datasecond_run_event_data = [self.event_dataevent_data, self.event_dataevent_data]
56 
57 
59  """Tests for normal worker behavior"""
60 
61  def testInitialization(self):
62  """test function"""
63  self.startstart()
64 
65  # Initialisation should be called
66  self.assertHasOutputFileassertHasOutputFile("initialize_called", timeout=1)
67 
68  def testRunSending(self):
69  """test function"""
70  self.startstart()
71 
72  # Send first event (should trigger begin run again)
73  self.sendsend(self.input_socket, "u", first_data=self.first_run_event_datafirst_run_event_data[0], identity=self.input_identityinput_identity)
74  self.assertHasOutputFileassertHasOutputFile("beginrun_called", timeout=0.5)
75  self.assertIsMsgTypeassertIsMsgType(self.output_socket, "w", router=True)
76  self.sendsend(self.output_socket, "c", identity=self.output_identityoutput_identity)
77 
78  # Send second event (should not trigger begin run again)
79  self.sendsend(self.input_socket, "u", first_data=self.first_run_event_datafirst_run_event_data[1], identity=self.input_identityinput_identity)
80  self.assertNotHasOutputFileassertNotHasOutputFile("beginrun_called", timeout=0.5)
81  self.assertIsMsgTypeassertIsMsgType(self.output_socket, "w", router=True)
82  self.sendsend(self.output_socket, "c", identity=self.output_identityoutput_identity)
83 
84  def testEndRun(self):
85  """test function"""
86  self.startstart()
87 
88  # TODO: do I want to test which run was ended?
89  # end run trigger
90  self.sendsend(self.input_socket, "l", identity=self.input_identityinput_identity)
91  self.assertHasOutputFileassertHasOutputFile("endrun_called", timeout=2)
92  self.assertIsMsgTypeassertIsMsgType(self.output_socket, "l", router=True)
93  self.sendsend(self.output_socket, "c", identity=self.output_identityoutput_identity)
94 
95  # Also the second one should give us an end run
96  self.sendsend(self.input_socket, "l", identity=self.input_identityinput_identity)
97  self.assertHasOutputFileassertHasOutputFile("endrun_called", timeout=1)
98  self.assertIsMsgTypeassertIsMsgType(self.output_socket, "l", router=True)
99  self.sendsend(self.output_socket, "c", identity=self.output_identityoutput_identity)
100 
101  # Sneak in an event in between -> should give beginRun
102  self.sendsend(self.input_socket, "u", first_data=self.first_run_event_datafirst_run_event_data[0], identity=self.input_identityinput_identity)
103  self.assertHasOutputFileassertHasOutputFile("beginrun_called", timeout=1)
104  self.assertIsMsgTypeassertIsMsgType(self.output_socket, "w", router=True)
105  self.sendsend(self.output_socket, "c", identity=self.output_identityoutput_identity)
106 
107  # And end the run again
108  self.sendsend(self.input_socket, "l", identity=self.input_identityinput_identity)
109  self.assertHasOutputFileassertHasOutputFile("endrun_called", timeout=1)
110  self.assertIsMsgTypeassertIsMsgType(self.output_socket, "l", router=True)
111  self.sendsend(self.output_socket, "c", identity=self.output_identityoutput_identity)
112 
113  # A second time...
114  self.sendsend(self.input_socket, "l", identity=self.input_identityinput_identity)
115  self.assertHasOutputFileassertHasOutputFile("endrun_called", timeout=1)
116  self.assertIsMsgTypeassertIsMsgType(self.output_socket, "l", router=True)
117  self.sendsend(self.output_socket, "c", identity=self.output_identityoutput_identity)
118 
119  # Sneak in a second event in between -> should give beginRun (as it is a new run)
120  self.sendsend(self.input_socket, "u", first_data=self.second_run_event_datasecond_run_event_data[0], identity=self.input_identityinput_identity)
121  self.assertHasOutputFileassertHasOutputFile("beginrun_called", timeout=1)
122  self.assertIsMsgTypeassertIsMsgType(self.output_socket, "w", router=True)
123  self.sendsend(self.output_socket, "c", identity=self.output_identityoutput_identity)
124 
125  # And end the run again
126  self.sendsend(self.input_socket, "l", identity=self.input_identityinput_identity)
127  self.assertHasOutputFileassertHasOutputFile("endrun_called", timeout=1)
128  self.assertIsMsgTypeassertIsMsgType(self.output_socket, "l", router=True)
129  self.sendsend(self.output_socket, "c", identity=self.output_identityoutput_identity)
130 
131  # Termination should also work
132  self.sendsend(self.input_socket, "x", identity=self.input_identityinput_identity)
133  # Attention: terminate is called in the different order
134  self.assertIsMsgTypeassertIsMsgType(self.output_socket, "x", router=True)
135  self.sendsend(self.output_socket, "c", identity=self.output_identityoutput_identity)
136  self.assertHasOutputFileassertHasOutputFile("terminate_called", timeout=1)
137 
138  # And the termination should cause the process to go down
139  self.assertIsDownassertIsDown("worker", timeout=200)
140 
141 
143  """Test case for dying workers"""
144 
145 
146  extra_arguments = ["--exit", "--prefix", "dying_"]
147 
149  """test function"""
150  self.startstart()
151 
152  # lets send some events
153  for _ in range(10):
154  self.sendsend(self.input_socket, "u", first_data=self.event_dataevent_data, identity=self.input_identityinput_identity)
155  self.assertIsMsgTypeassertIsMsgType(self.output_socket, "w", router=True)
156  self.sendsend(self.output_socket, "c", identity=self.output_identityoutput_identity)
157 
158  self.assertHasOutputFileassertHasOutputFile("dying_beginrun_called", timeout=1)
159 
160  # Now we kill one of the workers
161  Path("dying_exit_request").touch()
162 
163  self.sendsend(self.input_socket, "u", first_data=self.event_dataevent_data, identity=self.input_identityinput_identity)
164  self.assertHasOutputFileassertHasOutputFile("dying_exit_called", timeout=1)
165  msg = self.assertIsMsgTypeassertIsMsgType(self.output_socket, "d", router=True)
166  # the message content should be the worker that has died
167  self.assertEqual(msg[2].decode(), self.output_identityoutput_identity)
168  self.sendsend(self.output_socket, "c", identity=msg[0].decode())
169 
170  self.assertIsDownassertIsDown("worker", timeout=10)
171 
172 
173 if __name__ == '__main__':
174 
175  number_of_failures = 0
176 
177  for i in range(ZMQ_TEST_FOR_LOOPS):
178  try:
179  main(exit=False)
180  except AssertionError:
181  number_of_failures += 1
182 
183 
184  message = f'Number of failed for loops: {number_of_failures}/{ZMQ_TEST_FOR_LOOPS}'
185  if number_of_failures <= ZMQ_TEST_MAX_FAILURES:
186  basf2.B2INFO(message)
187  else:
188  basf2.B2FATAL(message)
list extra_arguments
extra arguments to pass to the worker script
Definition: test_worker.py:22
output_identity
output_identity
Definition: test_worker.py:44
input_identity
input_identity
Definition: test_worker.py:49
def assertIsMsgType(self, socket, message_type, final=True, router=False)
needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
Definition: test_support.py:38
def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
def assertNotHasOutputFile(self, output_file, timeout=0.5)
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)
Definition: test_support.py:93
Definition: main.py:1