Belle II Software development
BaseCollectorTestCase Class Reference
Inheritance diagram for BaseCollectorTestCase:
HLTZMQTestCase

Public Member Functions

def setUp (self)
 
def create_output_socket (self)
 
def get_signal (self, output_socket, signal_type)
 
def get_event (self, output_socket)
 
def testHelloAndMessageTransmission (self)
 
def testWrongRegistration (self)
 
def testEventPropagation (self)
 

Public Attributes

 input_port
 input_port
 
 output_port
 output_port
 
 monitoring_port
 monitoring_port
 
 needed_programs
 

Static Public Attributes

bool final_collector = False
 final_collector
 

Detailed Description

As the collectors are mostly equal, use a common base test case class

Definition at line 254 of file test_support.py.

Member Function Documentation

◆ create_output_socket()

def create_output_socket (   self)
create the output socket depending if final collector or not

Definition at line 283 of file test_support.py.

283 def create_output_socket(self):
284 """create the output socket depending if final collector or not"""
285 if self.final_collector:
286 output_socket = self.create_socket(self.output_port, socket_type=zmq.STREAM, bind=True)
287 output_socket.send(b"")
288 self.recv(output_socket)
289 else:
290 output_socket = self.create_socket(self.output_port)
291 self.send(output_socket, "r")
292 return output_socket
293

◆ get_event()

def get_event (   self,
  output_socket 
)
get an event from the socket depending if final collector or not

Definition at line 301 of file test_support.py.

301 def get_event(self, output_socket):
302 """get an event from the socket depending if final collector or not"""
303 if self.final_collector:
304 self.recv(output_socket)
305 self.assertNothingMore(output_socket)
306 else:
307 self.assertIsMsgType(output_socket, "u")
308

◆ get_signal()

def get_signal (   self,
  output_socket,
  signal_type 
)
get a signal from the socket depending if final collector or not

Definition at line 294 of file test_support.py.

294 def get_signal(self, output_socket, signal_type):
295 """get a signal from the socket depending if final collector or not"""
296 if self.final_collector:
297 self.assertNothingMore(output_socket)
298 else:
299 self.assertIsMsgType(output_socket, signal_type, final=True)
300

◆ setUp()

def setUp (   self)
Setup port numbers and necessary programs

Reimplemented from HLTZMQTestCase.

Definition at line 261 of file test_support.py.

261 def setUp(self):
262 """Setup port numbers and necessary programs"""
263
264 self.input_port = HLTZMQTestCase.get_free_port()
265
266 self.output_port = HLTZMQTestCase.get_free_port()
267
268 self.monitoring_port = HLTZMQTestCase.get_free_port()
269
270 command = "b2hlt_finalcollector" if self.final_collector else "b2hlt_collector"
271 output = "localhost" if self.final_collector else "*"
272 self.needed_programs = {
273 "collector": [
274 command,
275 "--input", f"tcp://*:{self.input_port}",
276 "--output", f"tcp://{output}:{self.output_port}",
277 "--monitor", f"tcp://*:{self.monitoring_port}"
278 ]
279 }
280 # programs are setup, call parent setup function now
281 super().setUp()
282

◆ testEventPropagation()

def testEventPropagation (   self)
test function

Definition at line 473 of file test_support.py.

473 def testEventPropagation(self):
474 """test function"""
475 monitoring_socket = self.create_socket(self.monitoring_port)
476
477 # Send two ready messages from the first socket
478 output_socket = self.create_output_socket()
479 if not self.final_collector:
480 self.send(output_socket, "r")
481
482 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 2)
483 self.assertMonitoring(monitoring_socket, "output.registered_workers", 1)
484
485 if not self.final_collector:
486 # Send two ready message from the second socket (the last one is needed to keep the collector listening)
487 second_output_socket = self.create_socket(self.output_port, identity="other_socket")
488 self.send(second_output_socket, "r")
489 self.send(second_output_socket, "r")
490
491 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 4)
492 self.assertMonitoring(monitoring_socket, "output.registered_workers", 2)
493
494 # Register two workers
495 input_socket = self.create_socket(self.input_port)
496 self.send(input_socket, "h")
497 self.assertIsMsgType(input_socket, "c")
498 self.assertMonitoring(monitoring_socket, "input.registered_workers", 1)
499
500 second_input_socket = self.create_socket(self.input_port, identity="other_socket")
501 self.send(second_input_socket, "h")
502 self.assertIsMsgType(second_input_socket, "c")
503 self.assertMonitoring(monitoring_socket, "input.registered_workers", 2)
504
505 # The first event should go to the first worker
506 self.send(input_socket, "u", b"event data")
507 self.assertIsMsgType(input_socket, "c")
508
509 self.get_event(output_socket)
510 if not self.final_collector:
511 self.assertNothingMore(second_output_socket)
512
513 # The second also
514 self.send(second_input_socket, "u", b"event data")
515 self.assertIsMsgType(second_input_socket, "c")
516
517 self.get_event(output_socket)
518 if not self.final_collector:
519 self.assertNothingMore(second_output_socket)
520
521 # But the third to the second worker
522 self.send(input_socket, "u", b"event data")
523 self.assertIsMsgType(input_socket, "c")
524
525 if not self.final_collector:
526 self.assertNothingMore(output_socket)
527 self.get_event(second_output_socket)
528 else:
529 self.get_event(output_socket)
530
531 # A stop message should be sent to all workers
532 self.send(input_socket, "l")
533 self.assertIsMsgType(input_socket, "c")
534
535 # But only if it is complete...
536 self.assertNothingMore(output_socket)
537 if not self.final_collector:
538 self.assertNothingMore(second_output_socket)
539
540 self.send(second_input_socket, "l")
541 self.assertIsMsgType(second_input_socket, "c")
542
543 self.get_signal(output_socket, "l")
544 if not self.final_collector:
545 self.get_signal(second_output_socket, "l")
546
547 # TODO: Test and implement: should not transmit events after stop
548
549 # As well as a terminate message
550 self.send(input_socket, "x")
551 self.assertIsMsgType(input_socket, "c")
552
553 self.assertNothingMore(output_socket)
554 if not self.final_collector:
555 self.assertNothingMore(second_output_socket)
556
557 self.send(second_input_socket, "x")
558 self.assertIsMsgType(second_input_socket, "c")
559
560 self.get_signal(output_socket, "x")
561 if not self.final_collector:
562 self.get_signal(second_output_socket, "x")
563
564 self.assertIsDown("collector")

◆ testHelloAndMessageTransmission()

def testHelloAndMessageTransmission (   self)
test function

Definition at line 309 of file test_support.py.

309 def testHelloAndMessageTransmission(self):
310 """test function"""
311 monitoring_socket = self.create_socket(self.monitoring_port)
312
313 # Register first worker
314 input_socket = self.create_socket(self.input_port)
315 self.send(input_socket, "h")
316
317 # However the collector is only polling for ready now...
318 self.assertNothingMore(input_socket)
319 self.assertMonitoring(monitoring_socket, "input.registered_workers", 0)
320
321 # To make it actually look for input messages, it needs at least a single worker
322 output_socket = self.create_output_socket()
323
324 # Now we can go on
325 self.assertIsMsgType(input_socket, "c")
326 self.assertMonitoring(monitoring_socket, "input.registered_workers", 1)
327
328 # Register second worker
329 second_input_socket = self.create_socket(self.input_port, identity="other_socket")
330 self.send(second_input_socket, "h")
331 self.assertIsMsgType(second_input_socket, "c")
332 self.assertMonitoring(monitoring_socket, "input.registered_workers", 2)
333
334 # So far no stop messages should be there
335 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
336 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
337
338 # the first stop message should not trigger a transmission
339 self.send(input_socket, "l")
340 self.assertIsMsgType(input_socket, "c")
341 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 1)
342 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
343 self.assertNothingMore(output_socket)
344
345 # The second stop message should
346 self.send(second_input_socket, "l")
347 self.assertIsMsgType(second_input_socket, "c")
348 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 2)
349 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", True)
350
351 self.get_signal(output_socket, "l")
352
353 # Another stop message should not change anything
354 self.send(input_socket, "l")
355 self.assertIsMsgType(input_socket, "c")
356 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 2)
357 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", True)
358 self.assertNothingMore(output_socket)
359
360 # But if we reset...
361 self.send(monitoring_socket, "n")
362 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
363 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
364
365 # .. it should
366 self.send(input_socket, "l")
367 self.assertIsMsgType(input_socket, "c")
368 self.send(second_input_socket, "l")
369 self.assertIsMsgType(second_input_socket, "c")
370
371 self.get_signal(output_socket, "l")
372
373 # Now we reset again
374 self.send(monitoring_socket, "n")
375 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
376 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
377
378 # send just one stop message
379 self.send(input_socket, "l")
380 self.assertIsMsgType(input_socket, "c")
381
382 # and unregister the second
383 self.send(second_input_socket, "d", b"other_socket")
384 self.assertIsMsgType(second_input_socket, "c")
385 self.assertMonitoring(monitoring_socket, "input.registered_workers", 1)
386
387 # which should also give us a stop message
388 self.get_signal(output_socket, "l")
389 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 1)
390 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", True)
391
392 # lets register and reset it again
393 self.send(second_input_socket, "h")
394 self.assertIsMsgType(second_input_socket, "c")
395 self.send(monitoring_socket, "n")
396 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
397 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
398 self.assertMonitoring(monitoring_socket, "input.registered_workers", 2)
399
400 # and try the other way round: first unregister, then send stop
401 self.send(second_input_socket, "d", b"other_socket")
402 self.assertIsMsgType(second_input_socket, "c")
403 self.send(input_socket, "l")
404 self.assertIsMsgType(input_socket, "c")
405 self.assertMonitoring(monitoring_socket, "input.registered_workers", 1)
406 # which should also give us a stop message
407 self.get_signal(output_socket, "l")
408 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 1)
409 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", True)
410
411 # reset the state
412 self.send(second_input_socket, "h")
413 self.assertIsMsgType(second_input_socket, "c")
414 self.send(monitoring_socket, "n")
415 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
416 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
417 self.assertMonitoring(monitoring_socket, "input.registered_workers", 2)
418
419 # The same applies to terminate messages:
420 # Nothing at the beginning
421 self.assertMonitoring(monitoring_socket, "input.received_terminate_messages", 0)
422 self.assertMonitoring(monitoring_socket, "input.all_terminate_messages", False)
423
424 # the first terminate message should not trigger a transmission
425 self.send(input_socket, "x")
426 self.assertIsMsgType(input_socket, "c")
427 self.assertMonitoring(monitoring_socket, "input.received_terminate_messages", 1)
428 self.assertMonitoring(monitoring_socket, "input.all_terminate_messages", False)
429 self.assertNothingMore(output_socket)
430
431 # Another terminate message should not change anything
432 self.send(input_socket, "x")
433 self.assertIsMsgType(input_socket, "c")
434 self.assertMonitoring(monitoring_socket, "input.received_terminate_messages", 1)
435 self.assertMonitoring(monitoring_socket, "input.all_terminate_messages", False)
436 self.assertNothingMore(output_socket)
437
438 # But if we reset...
439 self.send(monitoring_socket, "n")
440 self.assertMonitoring(monitoring_socket, "input.received_terminate_messages", 0)
441 self.assertMonitoring(monitoring_socket, "input.all_terminate_messages", False)
442
443 # ... and send again ...
444 self.send(input_socket, "x")
445 self.assertIsMsgType(input_socket, "c")
446 self.send(second_input_socket, "x")
447 self.assertIsMsgType(second_input_socket, "c")
448
449 self.get_signal(output_socket, "x")
450
451 # ... and the collector should have terminated
452 self.assertIsDown("collector")
453
454 # TODO: test and implement: timeout in wait for stop or terminate messages
455

◆ testWrongRegistration()

def testWrongRegistration (   self)
test function

Definition at line 456 of file test_support.py.

456 def testWrongRegistration(self):
457 """test function"""
458 # To make it actually look for input messages, it needs at least a single worker
459 self.create_output_socket()
460
461 # Register first worker
462 input_socket = self.create_socket(self.input_port)
463 self.send(input_socket, "h")
464 self.assertIsMsgType(input_socket, "c")
465
466 # Send with a second, unregistered worker
467 second_input_socket = self.create_socket(self.input_port, identity="other_socket")
468 self.send(second_input_socket, "l")
469
470 # The collector should die
471 self.assertIsDown("collector")
472

Member Data Documentation

◆ final_collector

bool final_collector = False
static

final_collector

Definition at line 259 of file test_support.py.

◆ input_port

input_port

input_port

Definition at line 264 of file test_support.py.

◆ monitoring_port

monitoring_port

monitoring_port

Definition at line 268 of file test_support.py.

◆ needed_programs

needed_programs

Definition at line 272 of file test_support.py.

◆ output_port

output_port

output_port

Definition at line 266 of file test_support.py.


The documentation for this class was generated from the following file: