Belle II Software release-09-00-07
test_udp_logging.py
1
8
9'''
10This test verifies that logging via a UDP connection works as expected.
11'''
12
13import json
14import socket
15import threading
16import time
17import b2test_utils
18
19
20HOST = "127.0.0.1"
21PORT = 9999
22EXPECTED_MESSAGES = [
23 {
24 "level": "INFO",
25 "message": "This is a INFO message",
26 "variables": {},
27 "module": "",
28 "package": "steering",
29 "function": "send_basf2_messages",
30 "proc": -1,
31 "count": 0,
32 "initialize": True,
33 },
34 {
35 "level": "WARNING",
36 "message": "And this is a WARNING message",
37 "variables": {},
38 "module": "",
39 "package": "steering",
40 "function": "send_basf2_messages",
41 "proc": -1,
42 "count": 1,
43 "initialize": True,
44 },
45]
46
47stop_server = threading.Event()
48received_messages = []
49
50
51def udp_server():
52 """UDP server that collects messages."""
53 with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as server_socket:
54 server_socket.bind((HOST, PORT))
55 server_socket.settimeout(0.5)
56 print(f"[Server] Started listening on {HOST}:{PORT}")
57 while not stop_server.is_set():
58 try:
59 data, addr = server_socket.recvfrom(1024)
60 except socket.timeout:
61 continue
62 message = data.decode()
63 print(f"[Server] Received from {addr}: {message}")
64 received_messages.append(message)
65 print(f'[Server] Stopped listening on {HOST}:{PORT}')
66
67
68def send_basf2_messages():
69 """Send basf2 log messages via UDP."""
70 import basf2
71
72 basf2.logging.add_udp(HOST, PORT)
73 basf2.B2INFO("This is a INFO message")
74 basf2.B2WARNING("And this is a WARNING message")
75
76
77def remove_fields(message_dict, fields):
78 """Return a copy of the dict without the specified fields."""
79 return {k: v for k, v in message_dict.items() if k not in fields}
80
81
82server_thread = threading.Thread(target=udp_server, daemon=True)
83server_thread.start()
84
85time.sleep(0.5)
86b2test_utils.run_in_subprocess(target=send_basf2_messages)
87time.sleep(0.5)
88
89stop_server.set()
90server_thread.join()
91
92for received_message in received_messages:
93 received_dict = json.loads(received_message)
94 normalized_received = remove_fields(received_dict, ["file", "line", "timestamp"])
95 matched = any(normalized_received == expected for expected in EXPECTED_MESSAGES)
96 assert matched, f"Received message not in expected messages: {normalized_received}"
def run_in_subprocess(*args, target, **kwargs)
Definition: __init__.py:221