4 from datetime
import datetime
6 import matplotlib.pyplot
as plt
10 from time
import sleep
13 def _receive(socket, filtering=True):
14 """Internal helper function to ask a socket for monitoring and get the answer as JSON"""
15 _, message, _ = socket.recv_multipart()
16 message = json.loads(message.decode())
19 if isinstance(value, str):
24 for category, dictionary
in message.items():
25 for key, value
in dictionary.items():
26 yield f
"{category}.{key}", normalize(value)
29 def _get_monitor_table_impl(sockets, show_detail):
30 """Functor for executing the actual monitoring request and returning the JSON"""
31 for socket
in sockets.values():
32 socket.send_multipart([b
"m", b
"", b
""])
34 for name, socket
in sockets.items():
35 for key, value
in _receive(socket):
36 if show_detail
or (
"[" not in key
and "last_measurement" not in key):
37 yield f
"{name}.{key}", value
40 def get_monitor_table(sockets, show_detail):
42 Ask the given sockets for their monitoring JSON and return
43 a dictionary with each answers (the socket address as the key).
44 The additional flag show_detail controls how many details of the returned
45 JSON should be included.
47 return dict(_get_monitor_table_impl(sockets=sockets, show_detail=show_detail))
50 def show_monitoring(df, clear=False):
52 Print the monitoring data produced by "get_monitor_table"
53 in a human readable form to the console.
55 tmp = pd.Series({tuple(key.split(
".")): value
for key, value
in df.items()}).unstack(0)
57 pd.set_option(
"max_rows", len(tmp))
63 def normalize_addresses(addresses):
65 Convert a list of addresses into a normalized format, where
66 each address starts with "tcp://", includes a hostname (if not given)
68 Also removed duplicates.
69 Useful when user input is processed.
71 addresses = [
"tcp://localhost:" + address
if address.isdigit()
else address
for address
in addresses]
72 addresses = [
"tcp://" + address
if "tcp://" not in address
else address
for address
in addresses]
73 addresses = set(addresses)
78 def write_monitoring(df, f):
80 Using the data produced by "get_monitor_table" dump it
81 to disk in the msgpack data format.
82 You will need to have msgpack installed for this.
83 Adds the current time as an additional column
84 Attention: this is the time of the function call which might or might not
85 correspond to the time the values were extracted.
90 raise ValueError(
"Please install msgpack with pip to use this functionality")
91 df[
"time"] = str(datetime.now())
92 f.write(msgpack.packb(df, use_bin_type=
True))
95 def load_measurement_file(file_name):
97 Load a measurement data file produced by
99 b2hlt_monitor.py --dat
101 and create a pandas dataframe out of it.
102 You need to have msgpack installed for this.
103 Automatically converts the stored time into a timedelta index
104 with the first measurements defined as 0.
105 See the jupyter notebook on how to use this function.
110 raise ValueError(
"Please install msgpack with pip to use this functionality")
112 with open(file_name,
"rb")
as f:
113 unpacker = msgpack.Unpacker(f, raw=
False)
114 df = pd.DataFrame(list(unpacker))
115 df = df.set_index(
"time")
116 df.index = pd.to_datetime(df.index)
117 df.index = (df.index - df.index[0]).total_seconds()
118 df.columns = df.columns.map(
lambda x: tuple(x.split(
".")))