Belle II Software  release-05-01-25
rfarmutil_bash.py
1 #!/usr/bin/env python
2 # DESY-TB special Nov.8, 2016, R.Itoh
3 # -*- coding: utf-8 -*-
4 
5 import os
6 import sys
7 import subprocess
8 import signal
9 import socket
10 import time
11 
12 
13 # Basic Utilities
14 # Get full path of the configuration file
15 
16 def get_configpath(conffile):
17  confdir = str(os.environ.get('RFARM_CONFDIR'))
18  if confdir == 'None':
19  print 'RFARM_CONFDIR is not defined. Exit.'
20  sys.exit()
21  cmd = confdir + '/' + conffile + '.conf'
22  return cmd
23 
24 
25 # Get Configuration from config file
26 def get_rfgetconf(conffile, item1, item2='NULL', item3='NULL'):
27 
28  confdir = str(os.environ.get('RFARM_CONFDIR'))
29  if confdir == 'None':
30  print 'RFARM_CONFDIR is not defined. Exit.'
31  sys.exit()
32  cmd = 'rfgetconf ' + get_configpath(conffile) + ' ' + item1 + ' ' + item2 \
33  + ' ' + item3
34  p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
35  stderr=subprocess.PIPE)
36  p.wait()
37  output = p.stdout.read()
38 # print output
39  return output
40 
41 
42 # print "waiting"
43 # confout = p.stdout.read()
44 
45 # NSMD related utilities
46 # Run NSMD
47 
48 def run_nsmd(nsmdir, port, nsmhost):
49  # Check directory for loggin
50  if not os.path.exists(nsmdir + '/' + nsmhost):
51  os.mkdir(nsmdir + '/' + nsmhost)
52  nsmd = str(os.environ.get('BELLE2_EXTERNALS_BIN')) + '/nsmd2 -f -p ' \
53  + port + ' -s ' + port + ' -h '
54  cmd = 'ssh ' + nsmhost + ' "cd ' + nsmdir + '/' + nsmhost \
55  + '; export NSMLOGDIR=' + nsmdir + '/' + nsmhost + ';' + nsmd \
56  + nsmhost + '"'
57  print cmd
58  p = subprocess.Popen(cmd, shell=True)
59  time.sleep(1)
60 
61 
62 # p.wait()
63 
64 # Kill NSMD
65 
66 def kill_nsmd(port, nsmhost):
67  cmd = 'ssh ' + nsmhost + ' "ps -fC nsmd2 | grep ' + port \
68  + "| awk '{print \$2}' \" > temp.pid"
69 # cmd = "ssh -v " + nsmhost + " \"ps -fC nsmd2 | grep " + port + "| awk '{printf(\"klll \%d\", \$2)} | sh' \""
70 # print cmd
71  p = subprocess.Popen(cmd, shell=True)
72  p.wait()
73  for line in open('temp.pid', 'r'):
74  pid = int(line)
75  if pid > 0:
76  cmd = 'ssh ' + nsmhost + ' "kill ' + str(pid) + '"'
77 # print cmd
78  p = subprocess.Popen(cmd, shell=True)
79  p.wait()
80 
81 
82 # Start NSMD on all nodes
83 
84 def start_nsmd(conffile):
85  # Global parameters
86  nsmdir = get_rfgetconf(conffile, 'system', 'nsmdir_base')
87  port = get_rfgetconf(conffile, 'system', 'nsmport')
88 
89  # Run nsmd on control node
90  ctlhost = get_rfgetconf(conffile, 'master', 'ctlhost')
91  run_nsmd(nsmdir, port, ctlhost)
92  print 'nsmd on %s started' % ctlhost
93 
94  # Run nsmd on event server node
95  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
96  if ctlhost.find(evshost) == -1:
97  run_nsmd(nsmdir, port, evshost)
98  print 'nsmd on %s started' % evshost
99 
100  # Run nsmd on output server node
101  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
102  run_nsmd(nsmdir, port, opshost)
103  print 'nsmd on %s started' % opshost
104 
105  # Run nsmd on event processor nodes
106  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
107  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
108  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
109  evphostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
110  for i in range(procid, procid + nnodes):
111  # nodeid = '%2.2d' % i
112  nodeid = '%1.1d' % i # DESY only
113  if badlist.find(nodeid) == -1:
114  evphost = evphostbase + nodeid
115  run_nsmd(nsmdir, port, evphost)
116  print 'nsmd on %s started' % evphost
117 
118 
119 def stop_nsmd(conffile):
120  port = get_rfgetconf(conffile, 'system', 'nsmport')
121 
122  # Kill nsmd on control node
123  ctlhost = get_rfgetconf(conffile, 'master', 'ctlhost')
124  kill_nsmd(port, ctlhost)
125  print 'nsmd on %s stopped' % ctlhost
126 
127  # Run nsmd on event server node
128  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
129  if ctlhost.find(evshost) == -1:
130  kill_nsmd(port, evshost)
131  print 'nsmd on %s stopped' % evshost
132 
133  # Run nsmd on output server node
134  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
135  kill_nsmd(port, opshost)
136  print 'nsmd on %s stopped' % opshost
137 
138  # Run nsmd on event processor nodes
139  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
140  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
141  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
142  evphostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
143  for i in range(procid, procid + nnodes):
144  # nodeid = '%2.2d' % i
145  nodeid = '%1.1d' % i # DESY only
146  if badlist.find(nodeid) == -1:
147  evphost = evphostbase + nodeid
148  kill_nsmd(port, evphost)
149  print 'nsmd on %s stopped' % evphost
150 
151 
152 # RFARM server operations
153 # Run eventserver
154 
155 def run_eventserver(conffile):
156  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
157  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
158  port = get_rfgetconf(conffile, 'system', 'nsmport')
159  if not os.path.exists(basedir + '/distributor'):
160  os.mkdir(basedir + '/distributor')
161  cmd = 'ssh ' + evshost + ' "cd ' + basedir + '; export NSM2_PORT=' + port \
162  + '; rf_eventserver ' + get_configpath(conffile) \
163  + ' &> distributor/nsmlog.log" '
164  print cmd
165  p = subprocess.Popen(cmd, shell=True)
166  time.sleep(1)
167 
168 
169 # Stop eventserver
170 
171 def stop_eventserver(conffile):
172  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
173  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
174  unit = get_rfgetconf(conffile, 'system', 'unitname')
175  ringbuf = get_rfgetconf(conffile, 'distributor', 'ringbuffer')
176  rbufname = unit + ':' + ringbuf
177  shmname = unit + ':distributor'
178  p = subprocess.Popen('rfcommand ' + conffile +
179  ' distributor RF_UNCONFIGURE', shell=True)
180  p.wait()
181  pidfile = basedir + '/distributor/pid.data'
182  for pid in open(pidfile, 'r'):
183  cmd = 'ssh ' + evshost + ' "kill ' + pid + '; removerb ' + rbufname \
184  + "; removeshm " + shmname + '"'
185  print cmd
186  p = subprocess.Popen(cmd, shell=True)
187  p.wait()
188 
189 
190 # Run outputserver
191 
192 def run_outputserver(conffile):
193  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
194  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
195  port = get_rfgetconf(conffile, 'system', 'nsmport')
196  if not os.path.exists(basedir + '/collector'):
197  os.mkdir(basedir + '/collector')
198  cmd = 'ssh ' + opshost + ' "cd ' + basedir + '; export NSM2_PORT=' + port \
199  + '; rf_outputserver ' + get_configpath(conffile) \
200  + ' &> collector/nsmlog.log" '
201  print cmd
202  p = subprocess.Popen(cmd, shell=True)
203  time.sleep(1)
204 
205 
206 # Stop outputserver
207 
208 def stop_outputserver(conffile):
209  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
210  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
211  unit = get_rfgetconf(conffile, 'system', 'unitname')
212  rbufin = get_rfgetconf(conffile, 'collector', 'ringbufin')
213  rbufout = get_rfgetconf(conffile, 'collector', 'ringbufout')
214  rbufinname = unit + ':' + rbufin
215  rbufoutname = unit + ':' + rbufout
216  shmname = unit + ':collector'
217  p = subprocess.Popen('rfcommand ' + conffile + ' collector RF_UNCONFIGURE', shell=True)
218  p.wait()
219  pidfile = basedir + '/collector/pid.data'
220  for pid in open(pidfile, 'r'):
221  cmd = 'ssh ' + opshost + ' "kill ' + pid + '; removerb ' + rbufinname \
222  + '; removerb ' + rbufoutname + '; removeshm ' + shmname \
223  + '; clear_basf2_ipc"'
224  print cmd
225  p = subprocess.Popen(cmd, shell=True)
226  p.wait()
227 
228 
229 # Start event procesor
230 
231 def run_eventprocessor(conffile):
232  hostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
233  nodebase = get_rfgetconf(conffile, 'processor', 'nodebase')
234  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
235  port = get_rfgetconf(conffile, 'system', 'nsmport')
236  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
237  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
238  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
239  id = int(get_rfgetconf(conffile, 'processor', 'idbase'))
240 
241  for i in range(procid, procid + nnodes):
242  nodenum = '%2.2d' % i
243  nodeid = '%1.1d' % i # DESY only
244  if badlist.find(nodeid) == -1:
245  evphost = hostbase + nodeid
246  nodename = nodebase + nodenum
247  if not os.path.exists(basedir + '/evp_' + nodename):
248  os.mkdir(basedir + '/evp_' + nodename)
249  cmd = 'ssh ' + evphost + ' "cd ' + basedir + '; export NSM2_PORT=' \
250  + port + '; rf_eventprocessor ' + get_configpath(conffile) \
251  + ' &> evp_' + nodename + '/nsmlog.log" '
252  print cmd
253  p = subprocess.Popen(cmd, shell=True)
254  time.sleep(1)
255 
256 
257 # Stop event procesor
258 
259 def stop_eventprocessor(conffile):
260  hostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
261  nodebase = get_rfgetconf(conffile, 'processor', 'nodebase')
262  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
263  port = get_rfgetconf(conffile, 'system', 'nsmport')
264  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
265  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
266  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
267  id = int(get_rfgetconf(conffile, 'processor', 'idbase'))
268 
269  unit = get_rfgetconf(conffile, 'system', 'unitname')
270  rbufin = get_rfgetconf(conffile, 'collector', 'ringbufin')
271  rbufout = get_rfgetconf(conffile, 'collector', 'ringbufout')
272  rbufinname = unit + ':' + rbufin
273  rbufoutname = unit + ':' + rbufout
274 
275  for i in range(procid, procid + nnodes):
276  nodeid = '%2.2d' % i
277 # nodeid = '%1.1d' % i # DESY only
278  if badlist.find(nodeid) == -1:
279  evphost = hostbase + nodeid
280  nodename = 'evp_' + nodebase + nodeid
281  shmname = unit + ':' + nodename
282  print shmname
283  p = subprocess.Popen('rfcommand ' + conffile + ' ' + nodename +
284  ' RF_UNCONFIGURE', shell=True)
285  p.wait()
286  pidfile = basedir + '/' + nodename + '/pid.data'
287  for pid in open(pidfile, 'r'):
288  cmd = 'ssh ' + evphost + ' "kill ' + pid + '; removerb ' \
289  + rbufinname + '; removerb ' + rbufoutname \
290  + '; removeshm ' + shmname + '; clear_basf2_ipc"'
291 # + '; removeshm ' + '"'
292 # + '; removeshm ' + shmname + '"'
293  print cmd
294  p = subprocess.Popen(cmd, shell=True)
295  p.wait()
296 
297 
298 # Run dqmserver
299 
300 def run_dqmserver(conffile):
301  dqmhost = get_rfgetconf(conffile, 'dqmserver', 'ctlhost')
302  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
303  port = get_rfgetconf(conffile, 'system', 'nsmport')
304  if not os.path.exists(basedir + '/dqmserver'):
305  os.mkdir(basedir + '/dqmserver')
306  cmd = 'ssh ' + dqmhost + ' "cd ' + basedir + '; export NSM2_PORT=' + port \
307  + '; rf_dqmserver ' + get_configpath(conffile) \
308  + ' &> dqmserver/nsmlog.log" '
309  print cmd
310  p = subprocess.Popen(cmd, shell=True)
311  time.sleep(1)
312 
313 
314 # Stop dqmserver
315 
316 def stop_dqmserver(conffile):
317  dqmhost = get_rfgetconf(conffile, 'dqmserver', 'ctlhost')
318  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
319  p = subprocess.Popen('rfcommand ' + conffile + ' dqmserver RF_UNCONFIGURE', shell=True)
320  p.wait()
321  pidfile = basedir + '/dqmserver/pid.data'
322  for pid in open(pidfile, 'r'):
323  cmd = 'ssh ' + dqmhost + ' "kill ' + pid + '"'
324  print cmd
325  p = subprocess.Popen(cmd, shell=True)
326  p.wait()
327 
328 
329 # Run roisender
330 
331 def run_roisender(conffile):
332  roihost = get_rfgetconf(conffile, 'roisender', 'ctlhost')
333  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
334  port = get_rfgetconf(conffile, 'system', 'nsmport')
335  if not os.path.exists(basedir + '/roisender'):
336  os.mkdir(basedir + '/roisender')
337  cmd = 'ssh ' + roihost + ' "cd ' + basedir + '; export NSM2_PORT=' + port \
338  + '; rf_roisender ' + get_configpath(conffile) \
339  + ' &> roisender/nsmlog.log" '
340  print cmd
341  p = subprocess.Popen(cmd, shell=True)
342  time.sleep(1)
343 
344 
345 # Stop roisender
346 
347 def stop_roisender(conffile):
348  roihost = get_rfgetconf(conffile, 'roisender', 'ctlhost')
349  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
350  unit = get_rfgetconf(conffile, 'system', 'unitname')
351  shmname = unit + ':roisender'
352  p = subprocess.Popen('rfcommand ' + conffile + ' roisender RF_UNCONFIGURE', shell=True)
353  p.wait()
354  pidfile = basedir + '/roisender/pid.data'
355  for pid in open(pidfile, 'r'):
356  cmd = 'ssh ' + roihost + ' "kill ' + pid + '; removeshm ' + shmname + '"'
357  print cmd
358  p = subprocess.Popen(cmd, shell=True)
359  p.wait()
360 
361 
362 # Run local master
363 
364 def run_master(conffile):
365  masterhost = get_rfgetconf(conffile, 'master', 'ctlhost')
366  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
367  port = get_rfgetconf(conffile, 'system', 'nsmport')
368  if not os.path.exists(basedir + '/master'):
369  os.mkdir(basedir + '/master')
370  cmd = 'ssh ' + masterhost + ' "cd ' + basedir + '; export NSM2_PORT=' \
371  + port + '; rf_master_local ' + get_configpath(conffile) \
372  + ' &> master/nsmlog.log" '
373  print cmd
374  p = subprocess.Popen(cmd, shell=True)
375  time.sleep(1)
376 
377 
378 # Stop local master
379 
380 def stop_master(conffile):
381  masterhost = get_rfgetconf(conffile, 'master', 'ctlhost')
382  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
383 # p = subprocess.Popen ( "rfcommand " + conffile + " master RF_UNCONFIGURE", shell=True );
384 # p.wait();
385  pidfile = basedir + '/master/pid.data'
386  for pid in open(pidfile, 'r'):
387  cmd = 'ssh ' + masterhost + ' "kill ' + pid + '"'
388  print cmd
389  p = subprocess.Popen(cmd, shell=True)
390  p.wait()
391 
392 
393 def start_rfarm_components(conffile):
394  run_eventprocessor(conffile)
395  run_outputserver(conffile)
396  run_eventserver(conffile)
397  run_dqmserver(conffile)
398  run_roisender(conffile)
399 
400 
401 # Stop RFARM components
402 
403 def stop_rfarm_components(conffile):
404  stop_roisender(conffile)
405  stop_dqmserver(conffile)
406  stop_eventserver(conffile)
407  stop_outputserver(conffile)
408  stop_eventprocessor(conffile)
409 
410 
411 # Start RFARM local operation
412 
413 def start_rfarm_local(conffile):
414  start_rfarm_components(conffile)
415  run_master(conffile)
416 
417 
418 # Stop RFARM local operation
419 
420 def stop_rfarm_local(conffile):
421  # stop_eventprocessor(conffile)
422  stop_rfarm_components(conffile)
423  stop_master(conffile)