Belle II Software development
DyingHLTTestCase Class Reference
Inheritance diagram for DyingHLTTestCase:
HLTZMQTestCase

Public Member Functions

def setUp (self)
 
def testFullRun (self)
 

Public Attributes

 distributor_input_port
 distributor_input_port
 
 distributor_output_port
 distributor_output_port
 
 distributor_monitoring_port
 distributor_monitoring_port
 
 final_collector_input_port
 final_collector_input_port
 
 final_collector_output_port
 final_collector_output_port
 
 final_collector_monitoring_port
 final_collector_monitoring_port
 
 needed_programs
 needed_programs
 

Static Public Attributes

open event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read()
 event_data
 

Detailed Description

Test case

Definition at line 262 of file test_hlt.py.

Member Function Documentation

◆ setUp()

def setUp (   self)
Setup port numbers and necessary programs

Reimplemented from HLTZMQTestCase.

Definition at line 267 of file test_hlt.py.

267 def setUp(self):
268 """Setup port numbers and necessary programs"""
269
270 self.distributor_input_port = HLTZMQTestCase.get_free_port()
271
272 self.distributor_output_port = HLTZMQTestCase.get_free_port()
273
274 self.distributor_monitoring_port = HLTZMQTestCase.get_free_port()
275
276
277 self.final_collector_input_port = HLTZMQTestCase.get_free_port()
278
279 self.final_collector_output_port = HLTZMQTestCase.get_free_port()
280
281 self.final_collector_monitoring_port = HLTZMQTestCase.get_free_port()
282
283
284 self.needed_programs = {
285 "distributor": [
286 "b2hlt_distributor",
287 "--input", f"tcp://localhost:{self.distributor_input_port}",
288 "--output", f"tcp://*:{self.distributor_output_port}",
289 "--monitor", f"tcp://*:{self.distributor_monitoring_port}"
290 ],
291 "final_collector": [
292 "b2hlt_finalcollector", "--input", f"tcp://*:{self.final_collector_input_port}",
293 "--output", f"tcp://localhost:{self.final_collector_output_port}",
294 "--monitor", f"tcp://*:{self.final_collector_monitoring_port}"
295 ],
296 "worker": [
297 "python3", basf2.find_file("daq/hbasf2/tests/passthrough.no_run_py"),
298 "--input", f"tcp://localhost:{self.distributor_output_port}",
299 "--output", f"tcp://localhost:{self.final_collector_input_port}"
300 ],
301 "dying_worker": [
302 "python3", basf2.find_file("daq/hbasf2/tests/passthrough.no_run_py"),
303 "--prefix", "dying_", "--exit",
304 "--input", f"tcp://localhost:{self.distributor_output_port}",
305 "--output", f"tcp://localhost:{self.final_collector_input_port}"
306 ],
307 }
308 super().setUp()
309

◆ testFullRun()

def testFullRun (   self)
test function

Definition at line 310 of file test_hlt.py.

310 def testFullRun(self):
311 """test function"""
312 distributor_monitoring_socket = self.create_socket(self.distributor_monitoring_port)
313 final_collector_monitoring_socket = self.create_socket(self.final_collector_monitoring_port)
314
315 input_socket = self.create_socket(self.distributor_input_port, socket_type=zmq.STREAM, bind=True)
316 input_identity, _ = self.recv(input_socket)
317
318 output_socket = self.create_socket(self.final_collector_output_port, socket_type=zmq.STREAM, bind=True)
319 output_identity, _ = self.recv(output_socket)
320
321 # At the beginning, everything should be at normal state
322 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
323 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
324 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
325 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 40)
326 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
327
328 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 2)
329
330 self.assertHasOutputFile("initialize_called", timeout=1)
331 self.assertHasOutputFile("dying_initialize_called", timeout=1)
332
333 # Now lets try sending some events
334 for _ in range(100):
335 input_socket.send_multipart([input_identity, self.event_data])
336
337 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
338 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
339 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
340 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 40)
341 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
342 self.assertMonitoring(distributor_monitoring_socket, "output.sent_events", 100)
343
344 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 2)
345 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 100)
346 self.assertMonitoring(final_collector_monitoring_socket, "output.sent_events", 100)
347
348 self.assertHasOutputFile("beginrun_called", timeout=1)
349 self.assertHasOutputFile("dying_beginrun_called", timeout=1)
350
351 buffer = b""
352 while output_socket.poll(0):
353 _, msg = self.recv(output_socket)
354 buffer += msg
355 self.assertNothingMore(output_socket)
356
357 # Data Size != raw data, as data is in different format, size taken from test itself
358 self.assertEqual(len(buffer), 122638 * 100)
359
360 # Now we kill one of the workers on purpose
361 Path("dying_exit_request").touch()
362
363 # And send some more events
364 for _ in range(100):
365 input_socket.send_multipart([input_identity, self.event_data])
366
367 self.assertHasOutputFile("dying_exit_called", timeout=1)
368 self.assertIsDown("dying_worker", timeout=10)
369
370 buffer = b""
371 while output_socket.poll(0):
372 _, msg = self.recv(output_socket)
373 buffer += msg
374 self.assertNothingMore(output_socket)
375
376 # We expect to have at least some events (but some will be missing)!
377 # 100 - 20 (queue size of dying worker) - 1 (the event that "caused" the problem) = 79
378 self.assertEqual(len(buffer), 122638 * 79)
379
380 # The collector should have it removed
381 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
382
383 # Also a stop signal should be transported correctly
384 self.send(distributor_monitoring_socket, "l")
385
386 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
387 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
388 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
389 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
390 # the distributor does not know about the dying worker, but this is also no problem
391 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
392 self.assertMonitoring(distributor_monitoring_socket, "output.sent_events", 200)
393
394 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
395 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 179)
396 self.assertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 1)
397 self.assertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 1)
398
399 # should go to the workers
400 self.assertHasOutputFile("endrun_called", timeout=1)
401
402 # Now lets restart the run
403 self.send(distributor_monitoring_socket, "n")
404 self.send(final_collector_monitoring_socket, "n")
405
406 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
407 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
408 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
409 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
410 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
411
412 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
413 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 179)
414 self.assertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 0)
415 self.assertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 0)
416
417 for _ in range(100):
418 input_socket.send_multipart([input_identity, self.event_data])
419
420 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
421 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
422 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
423 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
424 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
425 self.assertMonitoring(distributor_monitoring_socket, "output.sent_events", 300)
426
427 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
428 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 279)
429 self.assertMonitoring(final_collector_monitoring_socket, "output.sent_events", 279)
430
431 self.assertNotHasOutputFile("endrun_called", timeout=1)
432 self.assertHasOutputFile("beginrun_called", timeout=1)
433
434 buffer = b""
435 while output_socket.poll(0):
436 _, msg = self.recv(output_socket)
437 buffer += msg
438 self.assertNothingMore(output_socket)
439
440 # Now all events should be transported again
441 self.assertEqual(len(buffer), 122638 * 100)
442
443 # And finally: terminate everything
444 self.send(distributor_monitoring_socket, "x")
445
446 self.assertIsDown("final_collector")
447 self.assertIsDown("distributor")
448 self.assertIsDown("worker")
449
450

Member Data Documentation

◆ distributor_input_port

distributor_input_port

distributor_input_port

Definition at line 270 of file test_hlt.py.

◆ distributor_monitoring_port

distributor_monitoring_port

distributor_monitoring_port

Definition at line 274 of file test_hlt.py.

◆ distributor_output_port

distributor_output_port

distributor_output_port

Definition at line 272 of file test_hlt.py.

◆ event_data

open event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read()
static

event_data

Definition at line 265 of file test_hlt.py.

◆ final_collector_input_port

final_collector_input_port

final_collector_input_port

Definition at line 277 of file test_hlt.py.

◆ final_collector_monitoring_port

final_collector_monitoring_port

final_collector_monitoring_port

Definition at line 281 of file test_hlt.py.

◆ final_collector_output_port

final_collector_output_port

final_collector_output_port

Definition at line 279 of file test_hlt.py.

◆ needed_programs

needed_programs

needed_programs

Definition at line 284 of file test_hlt.py.


The documentation for this class was generated from the following file: