Belle II Software development
HLTTestCase Class Reference
Inheritance diagram for HLTTestCase:
HLTZMQTestCase

Public Member Functions

def setUp (self)
 
def testFullRun (self)
 

Public Attributes

 distributor_input_port
 distributor_input_port
 
 distributor_output_port
 distributor_output_port
 
 distributor_monitoring_port
 distributor_monitoring_port
 
 collector_input_port
 collector_input_port
 
 collector_output_port
 collector_output_port
 
 collector_monitoring_port
 collector_monitoring_port
 
 final_collector_input_port
 final_collector_input_port
 
 final_collector_output_port
 final_collector_output_port
 
 final_collector_monitoring_port
 final_collector_monitoring_port
 
 needed_programs
 needed_programs
 

Static Public Attributes

open event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read()
 event_data
 

Detailed Description

Test case

Definition at line 18 of file test_hlt.py.

Member Function Documentation

◆ setUp()

def setUp (   self)
Setup port numbers and necessary programs

Reimplemented from HLTZMQTestCase.

Definition at line 23 of file test_hlt.py.

23 def setUp(self):
24 """Setup port numbers and necessary programs"""
25
26 self.distributor_input_port = HLTZMQTestCase.get_free_port()
27
28 self.distributor_output_port = HLTZMQTestCase.get_free_port()
29
30 self.distributor_monitoring_port = HLTZMQTestCase.get_free_port()
31
32
33 self.collector_input_port = HLTZMQTestCase.get_free_port()
34
35 self.collector_output_port = HLTZMQTestCase.get_free_port()
36
37 self.collector_monitoring_port = HLTZMQTestCase.get_free_port()
38
39
40 self.final_collector_input_port = HLTZMQTestCase.get_free_port()
41
42 self.final_collector_output_port = HLTZMQTestCase.get_free_port()
43
44 self.final_collector_monitoring_port = HLTZMQTestCase.get_free_port()
45
46
47 self.needed_programs = {
48 "distributor": [
49 "b2hlt_distributor",
50 "--input", f"tcp://localhost:{self.distributor_input_port}",
51 "--output", f"tcp://*:{self.distributor_output_port}",
52 "--monitor", f"tcp://*:{self.distributor_monitoring_port}"
53 ],
54 "collector": [
55 "b2hlt_collector",
56 "--input", f"tcp://*:{self.collector_input_port}",
57 "--output", f"tcp://*:{self.collector_output_port}",
58 "--monitor", f"tcp://*:{self.collector_monitoring_port}"
59 ],
60 "final_collector": [
61 "b2hlt_finalcollector",
62 "--input", f"tcp://*:{self.final_collector_input_port}",
63 "--output", f"tcp://localhost:{self.final_collector_output_port}",
64 "--monitor", f"tcp://*:{self.final_collector_monitoring_port}"
65 ],
66 "worker": [
67 "python3", basf2.find_file("daq/hbasf2/tests/passthrough.no_run_py"),
68 "--input", f"tcp://localhost:{self.distributor_output_port}",
69 "--output", f"tcp://localhost:{self.collector_input_port}"
70 ],
71 "output_worker": [
72 "python3", basf2.find_file("daq/hbasf2/tests/passthrough.no_run_py"),
73 "--prefix", "output_",
74 "--input", f"tcp://localhost:{self.collector_output_port}",
75 "--output", f"tcp://localhost:{self.final_collector_input_port}"
76 ],
77 }
78 super().setUp()
79

◆ testFullRun()

def testFullRun (   self)
test function

Definition at line 80 of file test_hlt.py.

80 def testFullRun(self):
81 """test function"""
82 distributor_monitoring_socket = self.create_socket(self.distributor_monitoring_port)
83 collector_monitoring_socket = self.create_socket(self.collector_monitoring_port)
84 final_collector_monitoring_socket = self.create_socket(self.final_collector_monitoring_port)
85
86 input_socket = self.create_socket(self.distributor_input_port, socket_type=zmq.STREAM, bind=True)
87 input_identity, _ = self.recv(input_socket)
88
89 output_socket = self.create_socket(self.final_collector_output_port, socket_type=zmq.STREAM, bind=True)
90 output_identity, _ = self.recv(output_socket)
91
92 # At the beginning, everything should be at normal state
93 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
94 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
95 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
96 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
97 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
98
99 self.assertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
100 self.assertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
101 self.assertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
102
103 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
104
105 self.assertHasOutputFile("initialize_called", timeout=1)
106 self.assertHasOutputFile("output_initialize_called", timeout=1)
107
108 # Now lets try sending some events
109 for _ in range(20):
110 input_socket.send_multipart([input_identity, self.event_data])
111
112 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
113 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
114 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
115 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
116 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
117 self.assertMonitoring(distributor_monitoring_socket, "output.sent_events", 20)
118
119 self.assertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
120 self.assertMonitoring(collector_monitoring_socket, "input.received_events", 20)
121 self.assertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
122 self.assertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
123 self.assertMonitoring(collector_monitoring_socket, "output.sent_events", 20)
124
125 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
126 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 20)
127 self.assertMonitoring(final_collector_monitoring_socket, "output.sent_events", 20)
128
129 self.assertHasOutputFile("beginrun_called", timeout=1)
130 self.assertHasOutputFile("output_beginrun_called", timeout=1)
131
132 buffer = b""
133 while output_socket.poll(0):
134 _, msg = self.recv(output_socket)
135 buffer += msg
136 self.assertNothingMore(output_socket)
137
138 # Data Size != raw data, as data is in different format, size taken from test itself
139 self.assertEqual(len(buffer), 122638 * 20)
140
141 # Now we stop this run
142 self.send(distributor_monitoring_socket, "l")
143
144 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
145 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
146 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
147 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
148 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
149
150 self.assertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
151 self.assertMonitoring(collector_monitoring_socket, "input.received_events", 20)
152 self.assertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
153 self.assertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
154 self.assertMonitoring(collector_monitoring_socket, "input.received_stop_messages", 1)
155 self.assertMonitoring(collector_monitoring_socket, "input.all_stop_messages", 1)
156
157 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
158 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 20)
159 self.assertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 1)
160 self.assertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 1)
161
162 # should go to the workers
163 self.assertHasOutputFile("endrun_called", timeout=1)
164 self.assertHasOutputFile("output_endrun_called", timeout=1)
165
166 # but not to the output
167 self.assertNothingMore(output_socket)
168
169 # a second stop message should only reach the first workers up to the collector
170 self.send(distributor_monitoring_socket, "l")
171
172 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
173 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
174 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
175 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
176 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
177
178 self.assertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
179 self.assertMonitoring(collector_monitoring_socket, "input.received_events", 20)
180 self.assertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
181 self.assertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
182 self.assertMonitoring(collector_monitoring_socket, "input.received_stop_messages", 1)
183 self.assertMonitoring(collector_monitoring_socket, "input.all_stop_messages", 1)
184
185 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
186 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 20)
187 self.assertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 1)
188 self.assertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 1)
189
190 # should go to the workers (longer timeout as distributor waits a bit)
191 self.assertHasOutputFile("endrun_called", timeout=5)
192 self.assertNotHasOutputFile("output_endrun_called", timeout=1)
193
194 # Now lets restart the run
195 self.send(distributor_monitoring_socket, "n")
196 self.send(collector_monitoring_socket, "n")
197 self.send(final_collector_monitoring_socket, "n")
198
199 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
200 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
201 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
202 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
203 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
204
205 self.assertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
206 self.assertMonitoring(collector_monitoring_socket, "input.received_events", 20)
207 self.assertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
208 self.assertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
209 self.assertMonitoring(collector_monitoring_socket, "input.received_stop_messages", 0)
210 self.assertMonitoring(collector_monitoring_socket, "input.all_stop_messages", 0)
211
212 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
213 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 20)
214 self.assertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 0)
215 self.assertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 0)
216
217 # And send some more events
218 for _ in range(20):
219 input_socket.send_multipart([input_identity, self.event_data])
220
221 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
222 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
223 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
224 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
225 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
226 self.assertMonitoring(distributor_monitoring_socket, "output.sent_events", 40)
227
228 self.assertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
229 self.assertMonitoring(collector_monitoring_socket, "input.received_events", 40)
230 self.assertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
231 self.assertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
232 self.assertMonitoring(collector_monitoring_socket, "output.sent_events", 40)
233
234 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
235 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 40)
236 self.assertMonitoring(final_collector_monitoring_socket, "output.sent_events", 40)
237
238 self.assertNotHasOutputFile("endrun_called", timeout=1)
239 self.assertNotHasOutputFile("output_endrun_called", timeout=1)
240 self.assertHasOutputFile("beginrun_called", timeout=1)
241 self.assertHasOutputFile("output_beginrun_called", timeout=1)
242
243 buffer = b""
244 while output_socket.poll(0):
245 _, msg = self.recv(output_socket)
246 buffer += msg
247 self.assertNothingMore(output_socket)
248
249 # Data Size != raw data, as data is in different format, size taken from test itself
250 self.assertEqual(len(buffer), 122638 * 20)
251
252 # And finally: terminate everything
253 self.send(distributor_monitoring_socket, "x")
254
255 self.assertIsDown("collector")
256 self.assertIsDown("final_collector")
257 self.assertIsDown("distributor")
258 self.assertIsDown("worker", timeout=1)
259 self.assertIsDown("output_worker")
260
261

Member Data Documentation

◆ collector_input_port

collector_input_port

collector_input_port

Definition at line 33 of file test_hlt.py.

◆ collector_monitoring_port

collector_monitoring_port

collector_monitoring_port

Definition at line 37 of file test_hlt.py.

◆ collector_output_port

collector_output_port

collector_output_port

Definition at line 35 of file test_hlt.py.

◆ distributor_input_port

distributor_input_port

distributor_input_port

Definition at line 26 of file test_hlt.py.

◆ distributor_monitoring_port

distributor_monitoring_port

distributor_monitoring_port

Definition at line 30 of file test_hlt.py.

◆ distributor_output_port

distributor_output_port

distributor_output_port

Definition at line 28 of file test_hlt.py.

◆ event_data

open event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read()
static

event_data

Definition at line 21 of file test_hlt.py.

◆ final_collector_input_port

final_collector_input_port

final_collector_input_port

Definition at line 40 of file test_hlt.py.

◆ final_collector_monitoring_port

final_collector_monitoring_port

final_collector_monitoring_port

Definition at line 44 of file test_hlt.py.

◆ final_collector_output_port

final_collector_output_port

final_collector_output_port

Definition at line 42 of file test_hlt.py.

◆ needed_programs

needed_programs

needed_programs

Definition at line 47 of file test_hlt.py.


The documentation for this class was generated from the following file: