Belle II Software  release-06-00-14
rfarmutil.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 
4 
11 
12 import os
13 import sys
14 import subprocess
15 import signal
16 import socket
17 import time
18 
19 
20 # Basic Utilities
21 # Get full path of the configuration file
22 
23 def get_configpath(conffile):
24  confdir = str(os.environ.get('RFARM_CONFDIR'))
25  if confdir == 'None':
26  print 'RFARM_CONFDIR is not defined. Exit.'
27  sys.exit()
28  cmd = confdir + '/' + conffile + '.conf'
29  return cmd
30 
31 
32 # Get Configuration from config file
33 def get_rfgetconf(conffile, item1, item2='NULL', item3='NULL'):
34 
35  confdir = str(os.environ.get('RFARM_CONFDIR'))
36  if confdir == 'None':
37  print 'RFARM_CONFDIR is not defined. Exit.'
38  sys.exit()
39  cmd = 'rfgetconf ' + get_configpath(conffile) + ' ' + item1 + ' ' + item2 \
40  + ' ' + item3
41  p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
42  stderr=subprocess.PIPE)
43  p.wait()
44  output = p.stdout.read()
45 # print "getconf = ", output
46  return output
47 
48 
49 # print "waiting"
50 # confout = p.stdout.read()
51 
52 # NSMD related utilities
53 # Run NSMD
54 
55 def run_nsmd(nsmdir, port, nsmhost):
56  # Check directory for loggin
57  if not os.path.exists(nsmdir + '/' + nsmhost):
58  os.mkdir(nsmdir + '/' + nsmhost)
59 # nsmd = str(os.environ.get('BELLE2_EXTERNALS_BIN')) + '/nsmd2 -f -p ' \
60  nsmd = 'nsmd2 -f -p ' \
61  + port + ' -s ' + port + ' -h '
62  cmd = 'ssh ' + nsmhost + ' "cd ' + nsmdir + '/' + nsmhost \
63  + '; setenv NSMLOGDIR ' + nsmdir + '/' + nsmhost + ';' + nsmd \
64  + nsmhost + '"'
65 # print cmd
66  p = subprocess.Popen(cmd, shell=True)
67  time.sleep(1)
68 
69 
70 # p.wait()
71 
72 # Kill NSMD
73 
74 def kill_nsmd(port, nsmhost):
75  cmd = 'ssh ' + nsmhost + ' "ps -fC nsmd2 | grep ' + port \
76  + "| awk '{print \\$2}' \" > temp.pid"
77 # cmd = "ssh -v " + nsmhost + " \"ps -fC nsmd2 | grep " + port + "| awk '{printf(\"klll \%d\", \$2)} | sh' \""
78 # print cmd
79  p = subprocess.Popen(cmd, shell=True)
80  p.wait()
81  for line in open('temp.pid', 'r'):
82  pid = int(line)
83  if pid > 0:
84  cmd = 'ssh ' + nsmhost + ' "kill ' + str(pid) + '"'
85 # print cmd
86  p = subprocess.Popen(cmd, shell=True)
87  p.wait()
88 
89 
90 # Start NSMD on all nodes
91 
92 def start_nsmd(conffile):
93  # Global parameters
94  nsmdir = get_rfgetconf(conffile, 'system', 'nsmdir_base')
95  port = get_rfgetconf(conffile, 'system', 'nsmport')
96 
97  # Run nsmd on control node
98  ctlhost = get_rfgetconf(conffile, 'master', 'ctlhost')
99  run_nsmd(nsmdir, port, ctlhost)
100  print 'nsmd on %s started' % ctlhost
101 
102  # Run nsmd on event server node
103  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
104  if ctlhost.find(evshost) == -1:
105  run_nsmd(nsmdir, port, evshost)
106  print 'nsmd on %s started' % evshost
107 
108  # Run nsmd on output server node
109  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
110  run_nsmd(nsmdir, port, opshost)
111  print 'nsmd on %s started' % opshost
112 
113  # Run nsmd on event processor nodes
114  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
115  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
116  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
117  evphostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
118  for i in range(procid, procid + nnodes):
119  nodeid = '%2.2d' % i
120  if badlist.find(nodeid) == -1:
121  evphost = evphostbase + nodeid
122  run_nsmd(nsmdir, port, evphost)
123  print 'nsmd on %s started' % evphost
124 
125 
126 def stop_nsmd(conffile):
127  port = get_rfgetconf(conffile, 'system', 'nsmport')
128 
129  # Kill nsmd on control node
130  ctlhost = get_rfgetconf(conffile, 'master', 'ctlhost')
131  kill_nsmd(port, ctlhost)
132  print 'nsmd on %s stopped' % ctlhost
133 
134  # Run nsmd on event server node
135  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
136  if ctlhost.find(evshost) == -1:
137  kill_nsmd(port, evshost)
138  print 'nsmd on %s stopped' % evshost
139 
140  # Run nsmd on output server node
141  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
142  kill_nsmd(port, opshost)
143  print 'nsmd on %s stopped' % opshost
144 
145  # Run nsmd on event processor nodes
146  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
147  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
148  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
149  evphostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
150  for i in range(procid, procid + nnodes):
151  nodeid = '%2.2d' % i
152  if badlist.find(nodeid) == -1:
153  evphost = evphostbase + nodeid
154  kill_nsmd(port, evphost)
155  print 'nsmd on %s stopped' % evphost
156 
157 
158 # RFARM server operations
159 # Run eventserver
160 
161 def run_eventserver(conffile):
162  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
163  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
164  port = get_rfgetconf(conffile, 'system', 'nsmport')
165  if not os.path.exists(basedir + '/distributor'):
166  os.mkdir(basedir + '/distributor')
167  cmd = 'ssh ' + evshost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
168  + '; rf_eventserver ' + get_configpath(conffile) \
169  + ' > & distributor/nsmlog.log" '
170  print cmd
171  p = subprocess.Popen(cmd, shell=True)
172  time.sleep(1)
173 
174 
175 # Stop eventserver
176 
177 def stop_eventserver(conffile):
178  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
179  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
180  unit = get_rfgetconf(conffile, 'system', 'unitname')
181  ringbuf = get_rfgetconf(conffile, 'distributor', 'ringbuffer')
182  rbufname = unit + ':' + ringbuf
183  shmname = unit + ':distributor'
184 # p = subprocess.Popen('rfcommand ' + conffile +
185 # ' distributor RC_ABORT', shell=True)
186 # p.wait()
187  pidfile = basedir + '/distributor/pid.data'
188  for pid in open(pidfile, 'r'):
189  cmd = 'ssh ' + evshost + ' "kill ' + pid + '; removerb ' + rbufname \
190  + "; removeshm " + shmname + '"'
191  print cmd
192  p = subprocess.Popen(cmd, shell=True)
193  p.wait()
194 
195 
196 # Run outputserver
197 
198 def run_outputserver(conffile):
199  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
200  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
201  port = get_rfgetconf(conffile, 'system', 'nsmport')
202  if not os.path.exists(basedir + '/collector'):
203  os.mkdir(basedir + '/collector')
204  cmd = 'ssh ' + opshost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
205  + '; rf_outputserver ' + get_configpath(conffile) \
206  + ' > & collector/nsmlog.log" '
207  print cmd
208  p = subprocess.Popen(cmd, shell=True)
209  time.sleep(1)
210 
211 
212 # Stop outputserver
213 
214 def stop_outputserver(conffile):
215  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
216  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
217  unit = get_rfgetconf(conffile, 'system', 'unitname')
218  rbufin = get_rfgetconf(conffile, 'collector', 'ringbufin')
219  rbufout = get_rfgetconf(conffile, 'collector', 'ringbufout')
220  rbufinname = unit + ':' + rbufin
221  rbufoutname = unit + ':' + rbufout
222  shmname = unit + ':collector'
223 # p = subprocess.Popen('rfcommand ' + conffile + ' collector RC_ABORT', shell=True)
224 # p.wait()
225  pidfile = basedir + '/collector/pid.data'
226  for pid in open(pidfile, 'r'):
227  cmd = 'ssh ' + opshost + ' "kill ' + pid + '; removerb ' + rbufinname \
228  + '; removerb ' + rbufoutname + '; removeshm ' + shmname \
229  + '; clear_basf2_ipc"'
230  print cmd
231  p = subprocess.Popen(cmd, shell=True)
232  p.wait()
233 
234 
235 # Start event procesor
236 
237 def run_eventprocessor(conffile):
238  hostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
239  nodebase = get_rfgetconf(conffile, 'processor', 'nodebase')
240  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
241  port = get_rfgetconf(conffile, 'system', 'nsmport')
242  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
243  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
244  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
245  id = int(get_rfgetconf(conffile, 'processor', 'idbase'))
246 
247  for i in range(procid, procid + nnodes):
248  nodeid = '%2.2d' % i
249  if badlist.find(nodeid) == -1:
250  evphost = hostbase + nodeid
251  nodename = nodebase + nodeid
252  if not os.path.exists(basedir + '/evp_' + nodename):
253  os.mkdir(basedir + '/evp_' + nodename)
254  cmd = 'ssh ' + evphost + ' "cd ' + basedir + '; setenv NSM2_PORT ' \
255  + port + '; rf_eventprocessor ' + get_configpath(conffile) \
256  + ' > & evp_' + nodename + '/nsmlog.log" '
257  print cmd
258  p = subprocess.Popen(cmd, shell=True)
259  time.sleep(1)
260 
261 
262 # Stop event procesor
263 
264 def stop_eventprocessor(conffile):
265  hostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
266  nodebase = get_rfgetconf(conffile, 'processor', 'nodebase')
267  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
268  port = get_rfgetconf(conffile, 'system', 'nsmport')
269  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
270  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
271  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
272  id = int(get_rfgetconf(conffile, 'processor', 'idbase'))
273 
274  unit = get_rfgetconf(conffile, 'system', 'unitname')
275  rbufin = get_rfgetconf(conffile, 'collector', 'ringbufin')
276  rbufout = get_rfgetconf(conffile, 'collector', 'ringbufout')
277  rbufinname = unit + ':' + rbufin
278  rbufoutname = unit + ':' + rbufout
279 
280  for i in range(procid, procid + nnodes):
281  nodeid = '%2.2d' % i
282  if badlist.find(nodeid) == -1:
283  evphost = hostbase + nodeid
284  nodename = 'evp_' + nodebase + nodeid
285  shmname = unit + ':' + nodename
286  print shmname
287 # p = subprocess.Popen('rfcommand ' + conffile + ' ' + nodename +
288 # ' RC_ABORT', shell=True)
289 # p.wait()
290  pidfile = basedir + '/' + nodename + '/pid.data'
291  for pid in open(pidfile, 'r'):
292  cmd = 'ssh ' + evphost + ' "kill ' + pid + '; removerb ' \
293  + rbufinname + '; removerb ' + rbufoutname \
294  + '; removeshm ' + shmname + '; clear_basf2_ipc"'
295 # + '; removeshm ' + '"'
296 # + '; removeshm ' + shmname + '"'
297  print cmd
298  p = subprocess.Popen(cmd, shell=True)
299  p.wait()
300 
301 
302 # Run dqmserver
303 
304 def run_dqmserver(conffile):
305  dqmhost = get_rfgetconf(conffile, 'dqmserver', 'ctlhost')
306  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
307  port = get_rfgetconf(conffile, 'system', 'nsmport')
308  if not os.path.exists(basedir + '/dqmserver'):
309  os.mkdir(basedir + '/dqmserver')
310  cmd = 'ssh ' + dqmhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
311  + '; rf_dqmserver ' + get_configpath(conffile) \
312  + ' > & dqmserver/nsmlog.log" '
313  print cmd
314  p = subprocess.Popen(cmd, shell=True)
315  time.sleep(1)
316 
317 
318 # Stop dqmserver
319 
320 def stop_dqmserver(conffile):
321  dqmhost = get_rfgetconf(conffile, 'dqmserver', 'ctlhost')
322  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
323 # p = subprocess.Popen('rfcommand ' + conffile + ' dqmserver RC_ABORT', shell=True)
324 # p.wait()
325  pidfile = basedir + '/dqmserver/pid.data'
326  for pid in open(pidfile, 'r'):
327  cmd = 'ssh ' + dqmhost + ' "kill ' + pid + '"'
328  print cmd
329  p = subprocess.Popen(cmd, shell=True)
330  p.wait()
331 
332 
333 # Run roisender
334 
335 def run_roisender(conffile):
336  roihost = get_rfgetconf(conffile, 'roisender', 'ctlhost')
337  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
338  port = get_rfgetconf(conffile, 'system', 'nsmport')
339  if not os.path.exists(basedir + '/roisender'):
340  os.mkdir(basedir + '/roisender')
341  cmd = 'ssh ' + roihost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
342  + '; rf_roisender ' + get_configpath(conffile) \
343  + ' > & roisender/nsmlog.log" '
344  print cmd
345  p = subprocess.Popen(cmd, shell=True)
346  time.sleep(1)
347 
348 
349 # Stop roisender
350 
351 def stop_roisender(conffile):
352  roihost = get_rfgetconf(conffile, 'roisender', 'ctlhost')
353  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
354  unit = get_rfgetconf(conffile, 'system', 'unitname')
355  shmname = unit + ':roisender'
356 # p = subprocess.Popen('rfcommand ' + conffile + ' roisender RC_ABORT', shell=True)
357 # p.wait()
358  pidfile = basedir + '/roisender/pid.data'
359  for pid in open(pidfile, 'r'):
360  cmd = 'ssh ' + roihost + ' "kill ' + pid + '; removeshm ' + shmname + '"'
361  print cmd
362  p = subprocess.Popen(cmd, shell=True)
363  p.wait()
364 
365 
366 # Run local master
367 
368 def run_master(conffile):
369  masterhost = get_rfgetconf(conffile, 'master', 'ctlhost')
370  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
371  port = get_rfgetconf(conffile, 'system', 'nsmport')
372  if not os.path.exists(basedir + '/master'):
373  os.mkdir(basedir + '/master')
374  cmd = 'ssh ' + masterhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' \
375  + port + '; rf_master_local ' + get_configpath(conffile) \
376  + ' > & master/nsmlog.log" '
377  print cmd
378  p = subprocess.Popen(cmd, shell=True)
379  time.sleep(1)
380 
381 
382 # Stop local master
383 
384 def stop_master(conffile):
385  masterhost = get_rfgetconf(conffile, 'master', 'ctlhost')
386  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
387 # p = subprocess.Popen ( "rfcommand " + conffile + " master RF_UNCONFIGURE", shell=True );
388 # p.wait();
389  pidfile = basedir + '/master/pid.data'
390  for pid in open(pidfile, 'r'):
391  cmd = 'ssh ' + masterhost + ' "kill ' + pid + '"'
392  print cmd
393  p = subprocess.Popen(cmd, shell=True)
394  p.wait()
395 
396 
397 def start_rfarm_components(conffile):
398  run_eventprocessor(conffile)
399  run_outputserver(conffile)
400  run_eventserver(conffile)
401  run_dqmserver(conffile)
402  run_roisender(conffile)
403 
404 
405 # Stop RFARM components
406 
407 def stop_rfarm_components(conffile):
408  stop_roisender(conffile)
409  print("stop dqmserver")
410  stop_dqmserver(conffile)
411  print("done")
412  stop_eventserver(conffile)
413  stop_outputserver(conffile)
414 # stop_eventprocessor(conffile)
415 
416 
417 # Start RFARM local operation
418 
419 def start_rfarm_local(conffile):
420  start_rfarm_components(conffile)
421  run_master(conffile)
422 
423 
424 # Stop RFARM local operation
425 
426 def stop_rfarm_local(conffile):
427  print("stopping eventprocessors")
428  stop_eventprocessor(conffile)
429  print("stopping rfarm components")
430  stop_rfarm_components(conffile)
431  print("stopping master")
432  stop_master(conffile)