Belle II Software development
utils.py
1
8
9import pandas as pd
10import json
11from datetime import datetime
12import os
13
14
15def _receive(socket, filtering=True):
16 """Internal helper function to ask a socket for monitoring and get the answer as JSON"""
17 _, message, _ = socket.recv_multipart()
18 message = json.loads(message.decode())
19
20 def normalize(value):
21 if isinstance(value, str):
22 return value.strip()
23 else:
24 return value
25
26 for category, dictionary in message.items():
27 for key, value in dictionary.items():
28 yield f"{category}.{key}", normalize(value)
29
30
31def _get_monitor_table_impl(sockets, show_detail):
32 """Functor for executing the actual monitoring request and returning the JSON"""
33 for socket in sockets.values():
34 socket.send_multipart([b"m", b"", b""])
35
36 for name, socket in sockets.items():
37 for key, value in _receive(socket):
38 if show_detail or ("[" not in key and "last_measurement" not in key):
39 yield f"{name}.{key}", value
40
41
42def get_monitor_table(sockets, show_detail):
43 """
44 Ask the given sockets for their monitoring JSON and return
45 a dictionary with each answers (the socket address as the key).
46 The additional flag show_detail controls how many details of the returned
47 JSON should be included.
48 """
49 return dict(_get_monitor_table_impl(sockets=sockets, show_detail=show_detail))
50
51
52def show_monitoring(df, clear=False):
53 """
54 Print the monitoring data produced by "get_monitor_table"
55 in a human readable form to the console.
56 """
57 tmp = pd.Series({tuple(key.split(".")): value for key, value in df.items()}).unstack(0)
58 tmp = tmp.fillna("-")
59 pd.set_option("max_rows", len(tmp))
60 if clear:
61 os.system("clear")
62 print(tmp)
63
64
65def normalize_addresses(addresses):
66 """
67 Convert a list of addresses into a normalized format, where
68 each address starts with "tcp://", includes a hostname (if not given)
69 and a port number.
70 Also removed duplicates.
71 Useful when user input is processed.
72 """
73 addresses = ["tcp://localhost:" + address if address.isdigit() else address for address in addresses]
74 addresses = ["tcp://" + address if "tcp://" not in address else address for address in addresses]
75 addresses = set(addresses)
76
77 return addresses
78
79
80def write_monitoring(df, f):
81 """
82 Using the data produced by "get_monitor_table" dump it
83 to disk in the msgpack data format.
84 You will need to have msgpack installed for this.
85 Adds the current time as an additional column
86 Attention: this is the time of the function call which might or might not
87 correspond to the time the values were extracted.
88 """
89 try:
90 import msgpack
91 except ImportError:
92 raise ValueError("Please install msgpack with pip to use this functionality")
93 df["time"] = str(datetime.now())
94 f.write(msgpack.packb(df, use_bin_type=True))
95
96
97def load_measurement_file(file_name):
98 """
99 Load a measurement data file produced by
100
101 b2hlt_monitor.py --dat
102
103 and create a pandas dataframe out of it.
104 You need to have msgpack installed for this.
105 Automatically converts the stored time into a timedelta index
106 with the first measurements defined as 0.
107 See the jupyter notebook on how to use this function.
108 """
109 try:
110 import msgpack
111 except ImportError:
112 raise ValueError("Please install msgpack with pip to use this functionality")
113
114 with open(file_name, "rb") as f:
115 unpacker = msgpack.Unpacker(f, raw=False)
116 df = pd.DataFrame(list(unpacker))
117 df = df.set_index("time")
118 df.index = pd.to_datetime(df.index)
119 df.index = (df.index - df.index[0]).total_seconds()
120 df.columns = df.columns.map(lambda x: tuple(x.split(".")))
121
122 return df