Belle II Software  release-05-01-25
utils.py
1 import pandas as pd
2 import json
3 import zmq
4 from datetime import datetime
5 
6 import matplotlib.pyplot as plt
7 import numpy as np
8 
9 import os
10 from time import sleep
11 
12 
13 def _receive(socket, filtering=True):
14  """Internal helper function to ask a socket for monitoring and get the answer as JSON"""
15  _, message, _ = socket.recv_multipart()
16  message = json.loads(message.decode())
17 
18  def normalize(value):
19  if isinstance(value, str):
20  return value.strip()
21  else:
22  return value
23 
24  for category, dictionary in message.items():
25  for key, value in dictionary.items():
26  yield f"{category}.{key}", normalize(value)
27 
28 
29 def _get_monitor_table_impl(sockets, show_detail):
30  """Functor for executing the actual monitoring request and returning the JSON"""
31  for socket in sockets.values():
32  socket.send_multipart([b"m", b"", b""])
33 
34  for name, socket in sockets.items():
35  for key, value in _receive(socket):
36  if show_detail or ("[" not in key and "last_measurement" not in key):
37  yield f"{name}.{key}", value
38 
39 
40 def get_monitor_table(sockets, show_detail):
41  """
42  Ask the given sockets for their monitoring JSON and return
43  a dictionary with each answers (the socket address as the key).
44  The additional flag show_detail controls how many details of the returned
45  JSON should be included.
46  """
47  return dict(_get_monitor_table_impl(sockets=sockets, show_detail=show_detail))
48 
49 
50 def show_monitoring(df, clear=False):
51  """
52  Print the monitoring data produced by "get_monitor_table"
53  in a human readable form to the console.
54  """
55  tmp = pd.Series({tuple(key.split(".")): value for key, value in df.items()}).unstack(0)
56  tmp = tmp.fillna("-")
57  pd.set_option("max_rows", len(tmp))
58  if clear:
59  os.system("clear")
60  print(tmp)
61 
62 
63 def normalize_addresses(addresses):
64  """
65  Convert a list of addresses into a normalized format, where
66  each address starts with "tcp://", includes a hostname (if not given)
67  and a port number.
68  Also removed duplicates.
69  Useful when user input is processed.
70  """
71  addresses = ["tcp://localhost:" + address if address.isdigit() else address for address in addresses]
72  addresses = ["tcp://" + address if "tcp://" not in address else address for address in addresses]
73  addresses = set(addresses)
74 
75  return addresses
76 
77 
78 def write_monitoring(df, f):
79  """
80  Using the data produced by "get_monitor_table" dump it
81  to disk in the msgpack data format.
82  You will need to have msgpack installed for this.
83  Adds the current time as an additional column
84  Attention: this is the time of the function call which might or might not
85  correspond to the time the values were extracted.
86  """
87  try:
88  import msgpack
89  except ImportError:
90  raise ValueError("Please install msgpack with pip to use this functionality")
91  df["time"] = str(datetime.now())
92  f.write(msgpack.packb(df, use_bin_type=True))
93 
94 
95 def load_measurement_file(file_name):
96  """
97  Load a measurement data file produced by
98 
99  b2hlt_monitor.py --dat
100 
101  and create a pandas dataframe out of it.
102  You need to have msgpack installed for this.
103  Automatically converts the stored time into a timedelta index
104  with the first measurements defined as 0.
105  See the jupyter notebook on how to use this function.
106  """
107  try:
108  import msgpack
109  except ImportError:
110  raise ValueError("Please install msgpack with pip to use this functionality")
111 
112  with open(file_name, "rb") as f:
113  unpacker = msgpack.Unpacker(f, raw=False)
114  df = pd.DataFrame(list(unpacker))
115  df = df.set_index("time")
116  df.index = pd.to_datetime(df.index)
117  df.index = (df.index - df.index[0]).total_seconds()
118  df.columns = df.columns.map(lambda x: tuple(x.split(".")))
119 
120  return df