Belle II Software  release-08-01-10
rfarmutil.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 
4 
11 
12 import os
13 import sys
14 import subprocess
15 import time
16 
17 
18 # Basic Utilities
19 # Get full path of the configuration file
20 
21 def get_configpath(conffile):
22  confdir = str(os.environ.get('RFARM_CONFDIR'))
23  if confdir == 'None':
24  print('RFARM_CONFDIR is not defined. Exit.')
25  sys.exit()
26  cmd = confdir + '/' + conffile + '.conf'
27  return cmd
28 
29 
30 # Get Configuration from config file
31 def get_rfgetconf(conffile, item1, item2='NULL', item3='NULL'):
32 
33  confdir = str(os.environ.get('RFARM_CONFDIR'))
34  if confdir == 'None':
35  print('RFARM_CONFDIR is not defined. Exit.')
36  sys.exit()
37  cmd = 'rfgetconf ' + get_configpath(conffile) + ' ' + item1 + ' ' + item2 \
38  + ' ' + item3
39  p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
40  stderr=subprocess.PIPE)
41  p.wait()
42  output = p.stdout.read()
43 # print("getconf = ", output)
44  return output
45 
46 
47 # print("waiting")
48 # confout = p.stdout.read()
49 
50 # NSMD related utilities
51 # Run NSMD
52 
53 def run_nsmd(nsmdir, port, nsmhost):
54  # Check directory for loggin
55  if not os.path.exists(nsmdir + '/' + nsmhost):
56  os.mkdir(nsmdir + '/' + nsmhost)
57 # nsmd = str(os.environ.get('BELLE2_EXTERNALS_BIN')) + '/nsmd2 -f -p ' \
58  nsmd = 'nsmd2 -f -p ' \
59  + port + ' -s ' + port + ' -h '
60  cmd = 'ssh ' + nsmhost + ' "cd ' + nsmdir + '/' + nsmhost \
61  + '; setenv NSMLOGDIR ' + nsmdir + '/' + nsmhost + ';' + nsmd \
62  + nsmhost + '"'
63 # print(cmd)
64  subprocess.Popen(cmd, shell=True)
65  time.sleep(1)
66 
67 
68 # p.wait()
69 
70 # Kill NSMD
71 
72 def kill_nsmd(port, nsmhost):
73  cmd = 'ssh ' + nsmhost + ' "ps -fC nsmd2 | grep ' + port \
74  + "| awk '{print \\$2}' \" > temp.pid"
75 # cmd = "ssh -v " + nsmhost + " \"ps -fC nsmd2 | grep " + port + "| awk '{printf(\"klll \%d\", \$2)} | sh' \""
76 # print(cmd)
77  p = subprocess.Popen(cmd, shell=True)
78  p.wait()
79  for line in open('temp.pid', 'r'):
80  pid = int(line)
81  if pid > 0:
82  cmd = 'ssh ' + nsmhost + ' "kill ' + str(pid) + '"'
83 # print(cmd)
84  p = subprocess.Popen(cmd, shell=True)
85  p.wait()
86 
87 
88 # Start NSMD on all nodes
89 
90 def start_nsmd(conffile):
91  # Global parameters
92  nsmdir = get_rfgetconf(conffile, 'system', 'nsmdir_base')
93  port = get_rfgetconf(conffile, 'system', 'nsmport')
94 
95  # Run nsmd on control node
96  ctlhost = get_rfgetconf(conffile, 'master', 'ctlhost')
97  run_nsmd(nsmdir, port, ctlhost)
98  print('nsmd on %s started' % ctlhost)
99 
100  # Run nsmd on event server node
101  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
102  if ctlhost.find(evshost) == -1:
103  run_nsmd(nsmdir, port, evshost)
104  print('nsmd on %s started' % evshost)
105 
106  # Run nsmd on output server node
107  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
108  run_nsmd(nsmdir, port, opshost)
109  print('nsmd on %s started' % opshost)
110 
111  # Run nsmd on event processor nodes
112  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
113  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
114  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
115  evphostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
116  for i in range(procid, procid + nnodes):
117  nodeid = '%2.2d' % i
118  if badlist.find(nodeid) == -1:
119  evphost = evphostbase + nodeid
120  run_nsmd(nsmdir, port, evphost)
121  print('nsmd on %s started' % evphost)
122 
123 
124 def stop_nsmd(conffile):
125  port = get_rfgetconf(conffile, 'system', 'nsmport')
126 
127  # Kill nsmd on control node
128  ctlhost = get_rfgetconf(conffile, 'master', 'ctlhost')
129  kill_nsmd(port, ctlhost)
130  print('nsmd on %s stopped' % ctlhost)
131 
132  # Run nsmd on event server node
133  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
134  if ctlhost.find(evshost) == -1:
135  kill_nsmd(port, evshost)
136  print('nsmd on %s stopped' % evshost)
137 
138  # Run nsmd on output server node
139  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
140  kill_nsmd(port, opshost)
141  print('nsmd on %s stopped' % opshost)
142 
143  # Run nsmd on event processor nodes
144  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
145  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
146  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
147  evphostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
148  for i in range(procid, procid + nnodes):
149  nodeid = '%2.2d' % i
150  if badlist.find(nodeid) == -1:
151  evphost = evphostbase + nodeid
152  kill_nsmd(port, evphost)
153  print('nsmd on %s stopped' % evphost)
154 
155 
156 # RFARM server operations
157 # Run eventserver
158 
159 def run_eventserver(conffile):
160  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
161  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
162  port = get_rfgetconf(conffile, 'system', 'nsmport')
163  if not os.path.exists(basedir + '/distributor'):
164  os.mkdir(basedir + '/distributor')
165  cmd = 'ssh ' + evshost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
166  + '; rf_eventserver ' + get_configpath(conffile) \
167  + ' > & distributor/nsmlog.log" '
168  print(cmd)
169  subprocess.Popen(cmd, shell=True)
170  time.sleep(1)
171 
172 
173 # Stop eventserver
174 
175 def stop_eventserver(conffile):
176  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
177  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
178  unit = get_rfgetconf(conffile, 'system', 'unitname')
179  ringbuf = get_rfgetconf(conffile, 'distributor', 'ringbuffer')
180  rbufname = unit + ':' + ringbuf
181  shmname = unit + ':distributor'
182 # p = subprocess.Popen('rfcommand ' + conffile +
183 # ' distributor RC_ABORT', shell=True)
184 # p.wait()
185  pidfile = basedir + '/distributor/pid.data'
186  for pid in open(pidfile, 'r'):
187  cmd = 'ssh ' + evshost + ' "kill ' + pid + '; removerb ' + rbufname \
188  + "; removeshm " + shmname + '"'
189  print(cmd)
190  p = subprocess.Popen(cmd, shell=True)
191  p.wait()
192 
193 
194 # Run outputserver
195 
196 def run_outputserver(conffile):
197  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
198  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
199  port = get_rfgetconf(conffile, 'system', 'nsmport')
200  if not os.path.exists(basedir + '/collector'):
201  os.mkdir(basedir + '/collector')
202  cmd = 'ssh ' + opshost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
203  + '; rf_outputserver ' + get_configpath(conffile) \
204  + ' > & collector/nsmlog.log" '
205  print(cmd)
206  subprocess.Popen(cmd, shell=True)
207  time.sleep(1)
208 
209 
210 # Stop outputserver
211 
212 def stop_outputserver(conffile):
213  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
214  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
215  unit = get_rfgetconf(conffile, 'system', 'unitname')
216  rbufin = get_rfgetconf(conffile, 'collector', 'ringbufin')
217  rbufout = get_rfgetconf(conffile, 'collector', 'ringbufout')
218  rbufinname = unit + ':' + rbufin
219  rbufoutname = unit + ':' + rbufout
220  shmname = unit + ':collector'
221 # p = subprocess.Popen('rfcommand ' + conffile + ' collector RC_ABORT', shell=True)
222 # p.wait()
223  pidfile = basedir + '/collector/pid.data'
224  for pid in open(pidfile, 'r'):
225  cmd = 'ssh ' + opshost + ' "kill ' + pid + '; removerb ' + rbufinname \
226  + '; removerb ' + rbufoutname + '; removeshm ' + shmname \
227  + '; clear_basf2_ipc"'
228  print(cmd)
229  p = subprocess.Popen(cmd, shell=True)
230  p.wait()
231 
232 
233 # Start event procesor
234 
235 def run_eventprocessor(conffile):
236  hostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
237  nodebase = get_rfgetconf(conffile, 'processor', 'nodebase')
238  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
239  port = get_rfgetconf(conffile, 'system', 'nsmport')
240  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
241  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
242  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
243  id = int(get_rfgetconf(conffile, 'processor', 'idbase')) # noqa
244 
245  for i in range(procid, procid + nnodes):
246  nodeid = '%2.2d' % i
247  if badlist.find(nodeid) == -1:
248  evphost = hostbase + nodeid
249  nodename = nodebase + nodeid
250  if not os.path.exists(basedir + '/evp_' + nodename):
251  os.mkdir(basedir + '/evp_' + nodename)
252  cmd = 'ssh ' + evphost + ' "cd ' + basedir + '; setenv NSM2_PORT ' \
253  + port + '; rf_eventprocessor ' + get_configpath(conffile) \
254  + ' > & evp_' + nodename + '/nsmlog.log" '
255  print(cmd)
256  subprocess.Popen(cmd, shell=True)
257  time.sleep(1)
258 
259 
260 # Stop event procesor
261 
262 def stop_eventprocessor(conffile):
263  hostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
264  nodebase = get_rfgetconf(conffile, 'processor', 'nodebase')
265  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
266  port = get_rfgetconf(conffile, 'system', 'nsmport') # noqa
267  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
268  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
269  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
270  id = int(get_rfgetconf(conffile, 'processor', 'idbase')) # noqa
271 
272  unit = get_rfgetconf(conffile, 'system', 'unitname')
273  rbufin = get_rfgetconf(conffile, 'collector', 'ringbufin')
274  rbufout = get_rfgetconf(conffile, 'collector', 'ringbufout')
275  rbufinname = unit + ':' + rbufin
276  rbufoutname = unit + ':' + rbufout
277 
278  for i in range(procid, procid + nnodes):
279  nodeid = '%2.2d' % i
280  if badlist.find(nodeid) == -1:
281  evphost = hostbase + nodeid
282  nodename = 'evp_' + nodebase + nodeid
283  shmname = unit + ':' + nodename
284  print(shmname)
285 # p = subprocess.Popen('rfcommand ' + conffile + ' ' + nodename +
286 # ' RC_ABORT', shell=True)
287 # p.wait()
288  pidfile = basedir + '/' + nodename + '/pid.data'
289  for pid in open(pidfile, 'r'):
290  cmd = 'ssh ' + evphost + ' "kill ' + pid + '; removerb ' \
291  + rbufinname + '; removerb ' + rbufoutname \
292  + '; removeshm ' + shmname + '; clear_basf2_ipc"'
293 # + '; removeshm ' + '"'
294 # + '; removeshm ' + shmname + '"'
295  print(cmd)
296  p = subprocess.Popen(cmd, shell=True)
297  p.wait()
298 
299 
300 # Run dqmserver
301 
302 def run_dqmserver(conffile):
303  dqmhost = get_rfgetconf(conffile, 'dqmserver', 'ctlhost')
304  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
305  port = get_rfgetconf(conffile, 'system', 'nsmport')
306  if not os.path.exists(basedir + '/dqmserver'):
307  os.mkdir(basedir + '/dqmserver')
308  cmd = 'ssh ' + dqmhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
309  + '; rf_dqmserver ' + get_configpath(conffile) \
310  + ' > & dqmserver/nsmlog.log" '
311  print(cmd)
312  subprocess.Popen(cmd, shell=True)
313  time.sleep(1)
314 
315 
316 # Stop dqmserver
317 
318 def stop_dqmserver(conffile):
319  dqmhost = get_rfgetconf(conffile, 'dqmserver', 'ctlhost')
320  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
321 # p = subprocess.Popen('rfcommand ' + conffile + ' dqmserver RC_ABORT', shell=True)
322 # p.wait()
323  pidfile = basedir + '/dqmserver/pid.data'
324  for pid in open(pidfile, 'r'):
325  cmd = 'ssh ' + dqmhost + ' "kill ' + pid + '"'
326  print(cmd)
327  p = subprocess.Popen(cmd, shell=True)
328  p.wait()
329 
330 
331 # Run roisender
332 
333 def run_roisender(conffile):
334  roihost = get_rfgetconf(conffile, 'roisender', 'ctlhost')
335  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
336  port = get_rfgetconf(conffile, 'system', 'nsmport')
337  if not os.path.exists(basedir + '/roisender'):
338  os.mkdir(basedir + '/roisender')
339  cmd = 'ssh ' + roihost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
340  + '; rf_roisender ' + get_configpath(conffile) \
341  + ' > & roisender/nsmlog.log" '
342  print(cmd)
343  subprocess.Popen(cmd, shell=True)
344  time.sleep(1)
345 
346 
347 # Stop roisender
348 
349 def stop_roisender(conffile):
350  roihost = get_rfgetconf(conffile, 'roisender', 'ctlhost')
351  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
352  unit = get_rfgetconf(conffile, 'system', 'unitname')
353  shmname = unit + ':roisender'
354 # p = subprocess.Popen('rfcommand ' + conffile + ' roisender RC_ABORT', shell=True)
355 # p.wait()
356  pidfile = basedir + '/roisender/pid.data'
357  for pid in open(pidfile, 'r'):
358  cmd = 'ssh ' + roihost + ' "kill ' + pid + '; removeshm ' + shmname + '"'
359  print(cmd)
360  p = subprocess.Popen(cmd, shell=True)
361  p.wait()
362 
363 
364 # Run local master
365 
366 def run_master(conffile):
367  masterhost = get_rfgetconf(conffile, 'master', 'ctlhost')
368  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
369  port = get_rfgetconf(conffile, 'system', 'nsmport')
370  if not os.path.exists(basedir + '/master'):
371  os.mkdir(basedir + '/master')
372  cmd = 'ssh ' + masterhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' \
373  + port + '; rf_master_local ' + get_configpath(conffile) \
374  + ' > & master/nsmlog.log" '
375  print(cmd)
376  subprocess.Popen(cmd, shell=True)
377  time.sleep(1)
378 
379 
380 # Stop local master
381 
382 def stop_master(conffile):
383  masterhost = get_rfgetconf(conffile, 'master', 'ctlhost')
384  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
385 # p = subprocess.Popen ( "rfcommand " + conffile + " master RF_UNCONFIGURE", shell=True );
386 # p.wait();
387  pidfile = basedir + '/master/pid.data'
388  for pid in open(pidfile, 'r'):
389  cmd = 'ssh ' + masterhost + ' "kill ' + pid + '"'
390  print(cmd)
391  p = subprocess.Popen(cmd, shell=True)
392  p.wait()
393 
394 
395 def start_rfarm_components(conffile):
396  run_eventprocessor(conffile)
397  run_outputserver(conffile)
398  run_eventserver(conffile)
399  run_dqmserver(conffile)
400  run_roisender(conffile)
401 
402 
403 # Stop RFARM components
404 
405 def stop_rfarm_components(conffile):
406  stop_roisender(conffile)
407  print("stop dqmserver")
408  stop_dqmserver(conffile)
409  print("done")
410  stop_eventserver(conffile)
411  stop_outputserver(conffile)
412 # stop_eventprocessor(conffile)
413 
414 
415 # Start RFARM local operation
416 
417 def start_rfarm_local(conffile):
418  start_rfarm_components(conffile)
419  run_master(conffile)
420 
421 
422 # Stop RFARM local operation
423 
424 def stop_rfarm_local(conffile):
425  print("stopping eventprocessors")
426  stop_eventprocessor(conffile)
427  print("stopping rfarm components")
428  stop_rfarm_components(conffile)
429  print("stopping master")
430  stop_master(conffile)