Belle II Software  release-08-01-10
b2hlt_unitinfo.py
1 #!/usr/bin/env python3
2 
3 
10 
11 import zmq
12 from argparse import ArgumentParser
13 from zmq_daq.utils import normalize_addresses, get_monitor_table
14 
15 import os
16 from time import sleep
17 from collections import defaultdict
18 
19 
20 def get_overview(df):
21  if df is None:
22  return None, None
23 
24  import pandas as pd
25  tmp = pd.Series({tuple(key.split(".")): value for key, value in df.items()}).unstack(0).T
26 
27  df_to_show = pd.DataFrame(columns=tmp.index).T
28 
29  if "input" in tmp and "dead_workers" in tmp["input"]:
30  df_to_show["dead workers"] = tmp["input"]["dead_workers"]
31 
32  if "output" in tmp and "ready_queue_size" in tmp["output"]:
33  df_to_show["ready queue size"] = tmp["output"]["ready_queue_size"]
34 
35  if "input" in tmp and "output" in tmp:
36  input_df = tmp["input"]
37  output_df = tmp["output"]
38 
39  # average between input and output
40  if "data_size" in input_df and "data_size" in output_df:
41  df_to_show["data size"] = (input_df["data_size"].fillna(0) + output_df["data_size"].fillna(0)) / 2
42 
43  if "event_rate" in input_df and "event_rate" in output_df:
44  df_to_show["event rate"] = (input_df["event_rate"].fillna(0) + output_df["event_rate"].fillna(0)) / 2
45 
46  # this actually only exists for one, so we fill the other with empty data
47  if "registered_workers" in input_df and "registered_workers" in output_df:
48  df_to_show["registered workers"] = input_df["registered_workers"].fillna(0) + output_df["registered_workers"].fillna(0)
49 
50  if "socket_state" in input_df and "socket_state" in output_df:
51  df_to_show["raw socket state"] = input_df["socket_state"].fillna("") + output_df["socket_state"].fillna("")
52 
53  df_to_show = df_to_show.T.fillna("")
54 
55  worker_information = defaultdict(lambda: defaultdict(int))
56 
57  def add_information(col, grouped_df, prefix):
58  if "[" not in col or not col.endswith("]"):
59  return
60 
61  key, process_identifier = col[:-1].split("[")
62  hostname, pid = process_identifier.split("_", 1)
63 
64  key = key.replace("_from", "").replace("_to", "")
65 
66  if key in ["total_number_messages", "hello_messages"]:
67  return
68 
69  key = key.replace("received_", "").replace("sent_", "")
70  key = key.replace("_", " ")
71 
72  worker_information[prefix + hostname][key] += grouped_df[col].dropna().iloc[0]
73 
74  if key == "data size":
75  worker_information[prefix + hostname]["hosts"] += 1
76 
77  if "input" in tmp:
78  grouped_input = tmp["input"]
79  for col in grouped_input.columns:
80  add_information(col, grouped_input, "from ")
81 
82  if "output" in tmp:
83  grouped_output = tmp["output"]
84  for col in grouped_output.columns:
85  add_information(col, grouped_output, "to ")
86 
87  worker_information = pd.DataFrame(worker_information).T
88 
89  if "hosts" in worker_information:
90  worker_information["hosts"] = pd.to_numeric(worker_information["hosts"], errors="coerce", downcast="integer")
91  if "ready messages" in worker_information:
92  worker_information["ready messages"] = pd.to_numeric(
93  worker_information["ready messages"], errors="coerce", downcast="integer")
94  if "events" in worker_information:
95  worker_information["events"] = pd.to_numeric(worker_information["events"], errors="coerce", downcast="integer")
96  if "data size" in worker_information and "hosts" in worker_information:
97  worker_information["data size"] = worker_information["data size"] / worker_information["hosts"]
98 
99  worker_information = worker_information.fillna("")
100  worker_information = worker_information.loc[sorted(worker_information.index, key=lambda x: x.split(" ")[::-1])]
101 
102  return df_to_show, worker_information
103 
104 
105 if __name__ == '__main__':
106  parser = ArgumentParser(
107  description="Shortcut to b2hlt_monitor.py to only show the most relevant information for the full HLT unit")
108  parser.add_argument(
109  "addresses",
110  nargs='*',
111  help="Monitor the given addresses. " +
112  "Valid input formats are 'tcp://<host>:<port>', '<host>:<port>' or just '<port>' in which case localhost is assumed.",
113  default=["tcp://hltin:7000", "tcp://hltout:7000"])
114  parser.add_argument("--watch", action="store_true", help="Enter watch mode, where the script is called every 1 second.")
115 
116  args = parser.parse_args()
117 
118  addresses = normalize_addresses(args.addresses)
119 
120  # Create and connect all needed sockets
121  ctx = zmq.Context()
122  ctx.setsockopt(zmq.LINGER, 0)
123  sockets = {address: ctx.socket(zmq.DEALER) for address in addresses}
124 
125  for address, socket in sockets.items():
126  socket.connect(address)
127 
128  try:
129  # When no additional things are requested, just show the table once and exit
130  if not args.watch:
131  df = get_monitor_table(sockets, show_detail=True)
132  df_to_show, worker_information = get_overview(df)
133  print(df_to_show)
134  print("")
135  print(worker_information)
136  exit()
137 
138  # Else we go into a main loop
139  while True:
140  df = get_monitor_table(sockets, show_detail=True)
141  df_to_show, worker_information = get_overview(df)
142 
143  os.system("clear")
144  print(df_to_show)
145  print("")
146  print(worker_information)
147 
148  sleep(1.0)
149  except KeyboardInterrupt:
150  pass