Belle II Software  release-05-01-25
b2hlt_unitinfo.py
1 #!/usr/bin/env python3
2 
3 # ************************************************************************#
4 # BASF2 (Belle Analysis Framework 2) #
5 # Copyright(C) 2019 - Belle II Collaboration #
6 # #
7 # Author: The Belle II Collaboration #
8 # Contributors: Nils Braun #
9 # #
10 # This software is provided "as is" without any warranty. #
11 # ************************************************************************#
12 
13 import zmq
14 from argparse import ArgumentParser, FileType
15 from zmq_daq.utils import normalize_addresses, get_monitor_table
16 
17 import os
18 from time import sleep
19 from collections import defaultdict
20 
21 
22 def get_overview(df):
23  if df is None:
24  return None, None
25 
26  import pandas as pd
27  tmp = pd.Series({tuple(key.split(".")): value for key, value in df.items()}).unstack(0).T
28 
29  df_to_show = pd.DataFrame(columns=tmp.index).T
30 
31  if "input" in tmp and "dead_workers" in tmp["input"]:
32  df_to_show["dead workers"] = tmp["input"]["dead_workers"]
33 
34  if "output" in tmp and "ready_queue_size" in tmp["output"]:
35  df_to_show["ready queue size"] = tmp["output"]["ready_queue_size"]
36 
37  if "input" in tmp and "output" in tmp:
38  input_df = tmp["input"]
39  output_df = tmp["output"]
40 
41  # average between input and output
42  if "data_size" in input_df and "data_size" in output_df:
43  df_to_show["data size"] = (input_df["data_size"].fillna(0) + output_df["data_size"].fillna(0)) / 2
44 
45  if "event_rate" in input_df and "event_rate" in output_df:
46  df_to_show["event rate"] = (input_df["event_rate"].fillna(0) + output_df["event_rate"].fillna(0)) / 2
47 
48  # this actually only exists for one, so we fill the other with empty data
49  if "registered_workers" in input_df and "registered_workers" in output_df:
50  df_to_show["registered workers"] = input_df["registered_workers"].fillna(0) + output_df["registered_workers"].fillna(0)
51 
52  if "socket_state" in input_df and "socket_state" in output_df:
53  df_to_show["raw socket state"] = input_df["socket_state"].fillna("") + output_df["socket_state"].fillna("")
54 
55  df_to_show = df_to_show.T.fillna("")
56 
57  worker_information = defaultdict(lambda: defaultdict(int))
58 
59  def add_information(col, grouped_df, prefix):
60  if "[" not in col or not col.endswith("]"):
61  return
62 
63  key, process_identifier = col[:-1].split("[")
64  hostname, pid = process_identifier.split("_", 1)
65 
66  key = key.replace("_from", "").replace("_to", "")
67 
68  if key in ["total_number_messages", "hello_messages"]:
69  return
70 
71  key = key.replace("received_", "").replace("sent_", "")
72  key = key.replace("_", " ")
73 
74  worker_information[prefix + hostname][key] += grouped_df[col].dropna().iloc[0]
75 
76  if key == "data size":
77  worker_information[prefix + hostname]["hosts"] += 1
78 
79  if "input" in tmp:
80  grouped_input = tmp["input"]
81  for col in grouped_input.columns:
82  add_information(col, grouped_input, "from ")
83 
84  if "output" in tmp:
85  grouped_output = tmp["output"]
86  for col in grouped_output.columns:
87  add_information(col, grouped_output, "to ")
88 
89  worker_information = pd.DataFrame(worker_information).T
90 
91  if "hosts" in worker_information:
92  worker_information["hosts"] = pd.to_numeric(worker_information["hosts"], errors="coerce", downcast="integer")
93  if "ready messages" in worker_information:
94  worker_information["ready messages"] = pd.to_numeric(
95  worker_information["ready messages"], errors="coerce", downcast="integer")
96  if "events" in worker_information:
97  worker_information["events"] = pd.to_numeric(worker_information["events"], errors="coerce", downcast="integer")
98  if "data size" in worker_information and "hosts" in worker_information:
99  worker_information["data size"] = worker_information["data size"] / worker_information["hosts"]
100 
101  worker_information = worker_information.fillna("")
102  worker_information = worker_information.loc[sorted(worker_information.index, key=lambda x: x.split(" ")[::-1])]
103 
104  return df_to_show, worker_information
105 
106 
107 if __name__ == '__main__':
108  parser = ArgumentParser(
109  description="Shortcut to b2hlt_monitor.py to only show the most relevant information for the full HLT unit")
110  parser.add_argument(
111  "addresses",
112  nargs='*',
113  help="Monitor the given addresses. " +
114  "Valid input formats are 'tcp://<host>:<port>', '<host>:<port>' or just '<port>' in which case localhost is assumed.",
115  default=["tcp://hltin:7000", "tcp://hltout:7000"])
116  parser.add_argument("--watch", action="store_true", help="Enter watch mode, where the script is called every 1 second.")
117 
118  args = parser.parse_args()
119 
120  addresses = normalize_addresses(args.addresses)
121 
122  # Create and connect all needed sockets
123  ctx = zmq.Context()
124  ctx.setsockopt(zmq.LINGER, 0)
125  sockets = {address: ctx.socket(zmq.DEALER) for address in addresses}
126 
127  for address, socket in sockets.items():
128  socket.connect(address)
129 
130  try:
131  # When no additional things are requested, just show the table once and exit
132  if not args.watch:
133  df = get_monitor_table(sockets, show_detail=True)
134  df_to_show, worker_information = get_overview(df)
135  print(df_to_show)
136  print("")
137  print(worker_information)
138  exit()
139 
140  # Else we go into a main loop
141  while True:
142  df = get_monitor_table(sockets, show_detail=True)
143  df_to_show, worker_information = get_overview(df)
144 
145  os.system("clear")
146  print(df_to_show)
147  print("")
148  print(worker_information)
149 
150  sleep(1.0)
151  except KeyboardInterrupt:
152  pass
zmq_daq.utils
Definition: utils.py:1