Belle II Software  release-08-01-10
HistogramTestCase Class Reference
Inheritance diagram for HistogramTestCase:
Collaboration diagram for HistogramTestCase:

Public Member Functions

def setUp (self)
 
def testEventPropagation (self)
 
def check_histogram_repeated (self, file_name, expected_factor)
 
def tearDown (self)
 
def assertIsDown (self, name, timeout=5, minimum_delay=0.1)
 
def assertIsRunning (self, name)
 
def assertMonitoring (self, socket, search_key, search_value, timeout=10)
 
def assertIsAndGet (self, socket, message_type, final=True, router=False)
 
def assertIsMsgType (self, socket, message_type, final=True, router=False)
 
def assertNothingMore (self, socket)
 
def assertHasOutputFile (self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
 
def assertNotHasOutputFile (self, output_file, timeout=0.5)
 

Static Public Member Functions

def get_free_port ()
 
def create_socket (port, socket_type=zmq.DEALER, identity="socket", bind=False)
 
def create_router_socket (port)
 
def send (socket, message_type, first_data=b"", second_data=b"", identity="")
 
def recv (socket)
 

Public Attributes

 first_input_port
 first_input_port
 
 first_monitoring_port
 first_monitoring_port
 
 second_input_port
 second_input_port
 
 second_monitoring_port
 second_monitoring_port
 
 final_collector_input_port
 final_collector_input_port
 
 final_collector_monitoring_port
 final_collector_monitoring_port
 
 needed_programs
 needed_programs
 
 test_dir
 use a temporary folder for testing
 
 previous_dir
 remember current working directory
 
 started_programs
 dict for all started programs
 

Static Public Attributes

 histogram_data = open(basf2.find_file("daq/hbasf2/tests/histos.raw"), "br").read()
 histogram_data
 
string event_data
 event_data More...
 
 ctx = zmq.Context()
 The ZMQ context.
 

Private Member Functions

def _is_running (self, name)
 

Detailed Description

Test case

Definition at line 42 of file test_histogram.py.

Member Function Documentation

◆ _is_running()

def _is_running (   self,
  name 
)
privateinherited
Check if a given program is still running.

Definition at line 84 of file test_support.py.

◆ assertHasOutputFile()

def assertHasOutputFile (   self,
  output_file,
  unlink = True,
  timeout = 0.5,
  minimum_delay = 0.1 
)
inherited
Assert that - at least after the given timeout - the output file
is present. If unlink is set to True, remove the file after checking.

Definition at line 227 of file test_support.py.

◆ assertIsAndGet()

def assertIsAndGet (   self,
  socket,
  message_type,
  final = True,
  router = False 
)
inherited
Assert that the next message received on the socket has the given message type.
If final is set to True, also assert that there is no additional message on the socket.
Use router only for router sockets.

Definition at line 200 of file test_support.py.

◆ assertIsDown()

def assertIsDown (   self,
  name,
  timeout = 5,
  minimum_delay = 0.1 
)
inherited
Test helper to assert the given program has terminated - at least after timeout in seconds has passed.
Checks every "minimal_delay seconds.

Definition at line 93 of file test_support.py.

◆ assertIsMsgType()

def assertIsMsgType (   self,
  socket,
  message_type,
  final = True,
  router = False 
)
inherited
Deprecated copy of "assertIsAndGet".

Definition at line 215 of file test_support.py.

◆ assertIsRunning()

def assertIsRunning (   self,
  name 
)
inherited
Assert that a given program is still running.

Definition at line 108 of file test_support.py.

◆ assertMonitoring()

def assertMonitoring (   self,
  socket,
  search_key,
  search_value,
  timeout = 10 
)
inherited
Ask the given socket for a monitoring JSON and make sure, the value related to "search_key"
is set to "search_value" - at least after the given timeout.
The search key can should be in the form "<category>.<key>".

Definition at line 169 of file test_support.py.

◆ assertNotHasOutputFile()

def assertNotHasOutputFile (   self,
  output_file,
  timeout = 0.5 
)
inherited
Assert that after the timeout the given file is not present
(a.k.a. no process has created it)

Definition at line 245 of file test_support.py.

◆ assertNothingMore()

def assertNothingMore (   self,
  socket 
)
inherited
Assert that there is no pending message to be received on the socket.

Definition at line 221 of file test_support.py.

◆ check_histogram_repeated()

def check_histogram_repeated (   self,
  file_name,
  expected_factor 
)
Repeatedly call check_histogram_output 5 times until it is actually fulfilled

Definition at line 203 of file test_histogram.py.

203  def check_histogram_repeated(self, file_name, expected_factor):
204  """Repeatedly call check_histogram_output 5 times until it is actually fulfilled"""
205  tries = 0
206  while tries < 5:
207  self.assertHasOutputFile(file_name, unlink=False, timeout=2)
208  if check_histogram_output(file_name, expected_factor):
209  break
210  tries += 1
211  else:
212  raise AssertionError("Even after retry, the output was not correct!")
213 
214 

◆ create_router_socket()

def create_router_socket (   port)
staticinherited
Shortcut to create a ROUTER type socket with the typical parameters
binding to the given port.

Definition at line 140 of file test_support.py.

◆ create_socket()

def create_socket (   port,
  socket_type = zmq.DEALER,
  identity = "socket",
  bind = False 
)
staticinherited
Create and return a ZMQ socket with the given type and identity and
bind or connect it to localhost and the given port.

Definition at line 115 of file test_support.py.

◆ get_free_port()

def get_free_port ( )
staticinherited
Get a free port number by reusing ZMQ's function for this.

Definition at line 41 of file test_support.py.

◆ recv()

def recv (   socket)
staticinherited
Try to receive a message from the socket (or throw an assertion error if none comes after the set timeout
of the socket).

Definition at line 159 of file test_support.py.

◆ send()

def send (   socket,
  message_type,
  first_data = b"",
  second_data = b"",
  identity = "" 
)
staticinherited
Send a message consisting of the message type, the first and the second data
either to the identity if given or without identity if omitted.

Definition at line 148 of file test_support.py.

◆ setUp()

def setUp (   self)
Setup port numbers and necessary programs

Reimplemented from HLTZMQTestCase.

Definition at line 62 of file test_histogram.py.

◆ tearDown()

def tearDown (   self)
inherited
Custom tearDown function to kill the started programs if still present
and remove the temporary folder again.

Definition at line 70 of file test_support.py.

◆ testEventPropagation()

def testEventPropagation (   self)
test function

Definition at line 103 of file test_histogram.py.

Member Data Documentation

◆ event_data

string event_data
static
Initial value:
= b"""{
"_typename" : "Belle2::EventMetaData",
"fUniqueID" : 0,
"fBits" : 33554432,
"m_event" : 1,
"m_run" : 1,
"m_subrun" : 0,
"m_experiment" : 1,
"m_production" : 0,
"m_time" : 0,
"m_parentLfn" : "",
"m_generatedWeight" : 1,
"m_errorFlag" : 0
}"""

event_data

Definition at line 47 of file test_histogram.py.


The documentation for this class was generated from the following file: