Belle II Software  release-05-01-25
test_distributor.py
1 import atexit
2 import os
3 from unittest import main
4 import basf2
5 
6 import zmq
7 
8 from zmq_daq.test_support import HLTZMQTestCase
9 
10 
12  """Test case"""
13 
14  event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read()
15 
16  def setUp(self):
17  """Setup port numbers and necessary programs"""
18 
19  self.input_port = HLTZMQTestCase.get_free_port()
20 
21  self.output_port = HLTZMQTestCase.get_free_port()
22 
23  self.monitoring_port = HLTZMQTestCase.get_free_port()
24 
25  self.needed_programs = {
26  "distributor": [
27  "b2hlt_distributor",
28  "--input", f"tcp://localhost:{self.input_port}",
29  "--output", f"tcp://*:{self.output_port}",
30  "--monitor", f"tcp://*:{self.monitoring_port}",
31  "--stopWaitingTime", "1"]}
32  super().setUp()
33 
35  """test function"""
36  monitoring_socket = self.create_socket(self.monitoring_port)
37 
38  self.assertMonitoring(monitoring_socket, "input.socket_state", "disconnected")
39  self.assertMonitoring(monitoring_socket, "input.socket_connects", 0)
40  self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
41 
42  # To make it actually look for input messages, it needs at least a single worker
43  output_socket = self.create_socket(self.output_port)
44  for _ in range(5):
45  self.send(output_socket, "r")
46 
47  # Now we can go on
48  input_socket = self.create_socket(self.input_port, socket_type=zmq.STREAM, bind=True)
49 
50  # Still no connection
51  self.assertMonitoring(monitoring_socket, "input.socket_state", "disconnected")
52  self.assertMonitoring(monitoring_socket, "input.socket_connects", 0)
53  self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
54 
55  # This should initiate a connection
56  identity, _ = self.recv(input_socket)
57 
58  self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
59  self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
60  self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
61 
62  # And we can also close it again
63  input_socket.close()
64 
65  self.assertMonitoring(monitoring_socket, "input.socket_state", "disconnected")
66  self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
67  self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 1)
68 
69  # or open
70  input_socket = self.create_socket(self.input_port, socket_type=zmq.STREAM, bind=True)
71  identity, _ = self.recv(input_socket)
72 
73  self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
74  self.assertMonitoring(monitoring_socket, "input.socket_connects", 2)
75  self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 1)
76 
77  # sending an event should not change anything
78  input_socket.send_multipart([identity, self.event_data])
79  input_socket.send_multipart([identity, self.event_data])
80  input_socket.send_multipart([identity, self.event_data])
81  input_socket.send_multipart([identity, self.event_data])
82 
83  self.assertMonitoring(monitoring_socket, "input.received_events", 4)
84 
85  self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
86  self.assertMonitoring(monitoring_socket, "input.socket_connects", 2)
87  self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 1)
88 
89  input_socket.close()
90 
91  self.assertMonitoring(monitoring_socket, "input.socket_state", "disconnected")
92  self.assertMonitoring(monitoring_socket, "input.socket_connects", 2)
93  self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 2)
94 
95  def testEvents(self):
96  """test function"""
97  monitoring_socket = self.create_socket(self.monitoring_port)
98 
99  # connect the input
100  input_socket = self.create_socket(self.input_port, socket_type=zmq.STREAM, bind=True)
101  identity, _ = self.recv(input_socket)
102 
103  # Start two workers
104  output_socket = self.create_socket(self.output_port)
105  second_output_socket = self.create_socket(self.output_port, identity="other_socket")
106 
107  self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
108  self.assertMonitoring(monitoring_socket, "output.registered_workers", 0)
109 
110  # Every ready should give us an event at exactly this worker, when there is an event
111 
112  # 1. only send ready message
113  self.send(output_socket, "r")
114 
115  # No event
116  self.assertNothingMore(output_socket)
117  self.assertNothingMore(second_output_socket)
118 
119  # But ready worker
120  self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 1)
121  self.assertMonitoring(monitoring_socket, "output.registered_workers", 1)
122  self.assertMonitoring(monitoring_socket, "output.ready_messages[socket]", 1)
123  self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
124  self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
125  self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
126 
127  # Now the event
128  input_socket.send_multipart([identity, self.event_data])
129 
130  self.assertIsMsgType(output_socket, "u")
131  self.assertNothingMore(second_output_socket)
132 
133  # And no ready worker
134  self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
135  self.assertMonitoring(monitoring_socket, "output.registered_workers", 1)
136  self.assertMonitoring(monitoring_socket, "output.ready_messages[socket]", 0)
137  self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
138  self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
139  self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
140 
141  # 2. Try out sending the event first
142  input_socket.send_multipart([identity, self.event_data])
143 
144  # Still no event
145  self.assertNothingMore(output_socket)
146  self.assertNothingMore(second_output_socket)
147 
148  # And no updated monitoring
149  self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
150  self.assertMonitoring(monitoring_socket, "output.registered_workers", 1)
151  self.assertMonitoring(monitoring_socket, "output.ready_messages[socket]", 0)
152  self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
153  self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
154  self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
155 
156  # but with a ready message
157  self.send(second_output_socket, "r")
158 
159  self.assertNothingMore(output_socket)
160  self.assertIsMsgType(second_output_socket, "u")
161 
162  self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
163  self.assertMonitoring(monitoring_socket, "output.registered_workers", 2)
164  self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
165  self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
166  self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
167 
168  # and again
169  input_socket.send_multipart([identity, self.event_data])
170 
171  self.send(second_output_socket, "r")
172  self.assertNothingMore(output_socket)
173  self.assertIsMsgType(second_output_socket, "u")
174 
175  # As we have answered all ready messages, nothing should be in the queue
176  self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
177  self.assertMonitoring(monitoring_socket, "output.registered_workers", 2)
178 
179  # until we start sending more ready messages
180  self.send(output_socket, "r")
181  self.send(output_socket, "r")
182  self.send(second_output_socket, "r")
183  self.send(second_output_socket, "r")
184 
185  self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 4)
186  self.assertMonitoring(monitoring_socket, "output.registered_workers", 2)
187 
188  def testEndRun(self):
189  """test function"""
190  monitoring_socket = self.create_socket(self.monitoring_port)
191 
192  # Start two workers
193  output_socket = self.create_socket(self.output_port)
194  second_output_socket = self.create_socket(self.output_port, identity="other_socket")
195 
196  self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
197  self.assertMonitoring(monitoring_socket, "output.registered_workers", 0)
198 
199  # and register them by sending a ready
200  self.send(output_socket, "r")
201  self.send(second_output_socket, "r")
202 
203  self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 2)
204  self.assertMonitoring(monitoring_socket, "output.registered_workers", 2)
205 
206  # a stop run should be sent to all workers
207  self.send(monitoring_socket, "l")
208 
209  self.assertIsMsgType(output_socket, "l")
210  self.assertIsMsgType(second_output_socket, "l")
211 
212  # also multiple times
213  self.send(monitoring_socket, "l")
214 
215  self.assertIsMsgType(output_socket, "l")
216  self.assertIsMsgType(second_output_socket, "l")
217 
218  # and if there are events in between they should not be transported
219  input_socket = self.create_socket(self.input_port, socket_type=zmq.STREAM, bind=True)
220  identity, _ = self.recv(input_socket)
221  input_socket.send_multipart([identity, self.event_data])
222  input_socket.send_multipart([identity, self.event_data])
223 
224  self.assertNothingMore(output_socket)
225  self.assertNothingMore(second_output_socket)
226 
227  # it shouldn't matter
228  self.send(monitoring_socket, "l")
229 
230  self.assertIsMsgType(output_socket, "l")
231  self.assertIsMsgType(second_output_socket, "l")
232 
233  # A terminate message should also be sent
234  self.send(monitoring_socket, "x")
235 
236  self.assertIsMsgType(output_socket, "x")
237  self.assertIsMsgType(second_output_socket, "x")
238 
239  # and the distributor should go down
240  self.assertIsDown("distributor")
241 
242 
243 if __name__ == '__main__':
244  main()
zmq_daq.test_support.HLTZMQTestCase.assertNothingMore
def assertNothingMore(self, socket)
Definition: test_support.py:209
zmq_daq.test_support.HLTZMQTestCase
Definition: test_support.py:17
zmq_daq.test_support.HLTZMQTestCase.needed_programs
needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
Definition: test_support.py:26
zmq_daq.test_support.HLTZMQTestCase.assertIsDown
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)
Definition: test_support.py:81
zmq_daq.test_support.HLTZMQTestCase.recv
def recv(socket)
Definition: test_support.py:147
zmq_daq.test_support.HLTZMQTestCase.assertIsMsgType
def assertIsMsgType(self, socket, message_type, final=True, router=False)
Definition: test_support.py:203
zmq_daq.test_support.HLTZMQTestCase.create_socket
def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False)
Definition: test_support.py:103
test_distributor.DistributorTestCase.input_port
input_port
input_port
Definition: test_distributor.py:19
test_distributor.DistributorTestCase.monitoring_port
monitoring_port
monitoring_port
Definition: test_distributor.py:23
main
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:77
zmq_daq.test_support.HLTZMQTestCase.send
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
Definition: test_support.py:136
test_distributor.DistributorTestCase.output_port
output_port
output_port
Definition: test_distributor.py:21
test_distributor.DistributorTestCase.testEvents
def testEvents(self)
Definition: test_distributor.py:95
zmq_daq.test_support
Definition: test_support.py:1
test_distributor.DistributorTestCase.testConnectAndDisconnect
def testConnectAndDisconnect(self)
Definition: test_distributor.py:34
test_distributor.DistributorTestCase.setUp
def setUp(self)
Definition: test_distributor.py:16
test_distributor.DistributorTestCase.testEndRun
def testEndRun(self)
Definition: test_distributor.py:188
zmq_daq.test_support.HLTZMQTestCase.assertMonitoring
def assertMonitoring(self, socket, search_key, search_value, timeout=10)
Definition: test_support.py:157
test_distributor.DistributorTestCase
Definition: test_distributor.py:11
test_distributor.DistributorTestCase.event_data
event_data
event_data
Definition: test_distributor.py:14