Belle II Software  release-06-02-00
rfarmutil_bash.py
1 #!/usr/bin/env python
2 # DESY-TB special Nov.8, 2016, R.Itoh
3 # -*- coding: utf-8 -*-
4 
5 
12 
13 import os
14 import sys
15 import subprocess
16 import signal
17 import socket
18 import time
19 
20 
21 # Basic Utilities
22 # Get full path of the configuration file
23 
24 def get_configpath(conffile):
25  confdir = str(os.environ.get('RFARM_CONFDIR'))
26  if confdir == 'None':
27  print 'RFARM_CONFDIR is not defined. Exit.'
28  sys.exit()
29  cmd = confdir + '/' + conffile + '.conf'
30  return cmd
31 
32 
33 # Get Configuration from config file
34 def get_rfgetconf(conffile, item1, item2='NULL', item3='NULL'):
35 
36  confdir = str(os.environ.get('RFARM_CONFDIR'))
37  if confdir == 'None':
38  print 'RFARM_CONFDIR is not defined. Exit.'
39  sys.exit()
40  cmd = 'rfgetconf ' + get_configpath(conffile) + ' ' + item1 + ' ' + item2 \
41  + ' ' + item3
42  p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
43  stderr=subprocess.PIPE)
44  p.wait()
45  output = p.stdout.read()
46 # print output
47  return output
48 
49 
50 # print "waiting"
51 # confout = p.stdout.read()
52 
53 # NSMD related utilities
54 # Run NSMD
55 
56 def run_nsmd(nsmdir, port, nsmhost):
57  # Check directory for loggin
58  if not os.path.exists(nsmdir + '/' + nsmhost):
59  os.mkdir(nsmdir + '/' + nsmhost)
60  nsmd = str(os.environ.get('BELLE2_EXTERNALS_BIN')) + '/nsmd2 -f -p ' \
61  + port + ' -s ' + port + ' -h '
62  cmd = 'ssh ' + nsmhost + ' "cd ' + nsmdir + '/' + nsmhost \
63  + '; export NSMLOGDIR=' + nsmdir + '/' + nsmhost + ';' + nsmd \
64  + nsmhost + '"'
65  print cmd
66  p = subprocess.Popen(cmd, shell=True)
67  time.sleep(1)
68 
69 
70 # p.wait()
71 
72 # Kill NSMD
73 
74 def kill_nsmd(port, nsmhost):
75  cmd = 'ssh ' + nsmhost + ' "ps -fC nsmd2 | grep ' + port \
76  + "| awk '{print \\$2}' \" > temp.pid"
77 # cmd = "ssh -v " + nsmhost + " \"ps -fC nsmd2 | grep " + port + "| awk '{printf(\"klll \%d\", \$2)} | sh' \""
78 # print cmd
79  p = subprocess.Popen(cmd, shell=True)
80  p.wait()
81  for line in open('temp.pid', 'r'):
82  pid = int(line)
83  if pid > 0:
84  cmd = 'ssh ' + nsmhost + ' "kill ' + str(pid) + '"'
85 # print cmd
86  p = subprocess.Popen(cmd, shell=True)
87  p.wait()
88 
89 
90 # Start NSMD on all nodes
91 
92 def start_nsmd(conffile):
93  # Global parameters
94  nsmdir = get_rfgetconf(conffile, 'system', 'nsmdir_base')
95  port = get_rfgetconf(conffile, 'system', 'nsmport')
96 
97  # Run nsmd on control node
98  ctlhost = get_rfgetconf(conffile, 'master', 'ctlhost')
99  run_nsmd(nsmdir, port, ctlhost)
100  print 'nsmd on %s started' % ctlhost
101 
102  # Run nsmd on event server node
103  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
104  if ctlhost.find(evshost) == -1:
105  run_nsmd(nsmdir, port, evshost)
106  print 'nsmd on %s started' % evshost
107 
108  # Run nsmd on output server node
109  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
110  run_nsmd(nsmdir, port, opshost)
111  print 'nsmd on %s started' % opshost
112 
113  # Run nsmd on event processor nodes
114  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
115  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
116  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
117  evphostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
118  for i in range(procid, procid + nnodes):
119  # nodeid = '%2.2d' % i
120  nodeid = '%1.1d' % i # DESY only
121  if badlist.find(nodeid) == -1:
122  evphost = evphostbase + nodeid
123  run_nsmd(nsmdir, port, evphost)
124  print 'nsmd on %s started' % evphost
125 
126 
127 def stop_nsmd(conffile):
128  port = get_rfgetconf(conffile, 'system', 'nsmport')
129 
130  # Kill nsmd on control node
131  ctlhost = get_rfgetconf(conffile, 'master', 'ctlhost')
132  kill_nsmd(port, ctlhost)
133  print 'nsmd on %s stopped' % ctlhost
134 
135  # Run nsmd on event server node
136  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
137  if ctlhost.find(evshost) == -1:
138  kill_nsmd(port, evshost)
139  print 'nsmd on %s stopped' % evshost
140 
141  # Run nsmd on output server node
142  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
143  kill_nsmd(port, opshost)
144  print 'nsmd on %s stopped' % opshost
145 
146  # Run nsmd on event processor nodes
147  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
148  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
149  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
150  evphostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
151  for i in range(procid, procid + nnodes):
152  # nodeid = '%2.2d' % i
153  nodeid = '%1.1d' % i # DESY only
154  if badlist.find(nodeid) == -1:
155  evphost = evphostbase + nodeid
156  kill_nsmd(port, evphost)
157  print 'nsmd on %s stopped' % evphost
158 
159 
160 # RFARM server operations
161 # Run eventserver
162 
163 def run_eventserver(conffile):
164  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
165  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
166  port = get_rfgetconf(conffile, 'system', 'nsmport')
167  if not os.path.exists(basedir + '/distributor'):
168  os.mkdir(basedir + '/distributor')
169  cmd = 'ssh ' + evshost + ' "cd ' + basedir + '; export NSM2_PORT=' + port \
170  + '; rf_eventserver ' + get_configpath(conffile) \
171  + ' &> distributor/nsmlog.log" '
172  print cmd
173  p = subprocess.Popen(cmd, shell=True)
174  time.sleep(1)
175 
176 
177 # Stop eventserver
178 
179 def stop_eventserver(conffile):
180  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
181  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
182  unit = get_rfgetconf(conffile, 'system', 'unitname')
183  ringbuf = get_rfgetconf(conffile, 'distributor', 'ringbuffer')
184  rbufname = unit + ':' + ringbuf
185  shmname = unit + ':distributor'
186  p = subprocess.Popen('rfcommand ' + conffile +
187  ' distributor RF_UNCONFIGURE', shell=True)
188  p.wait()
189  pidfile = basedir + '/distributor/pid.data'
190  for pid in open(pidfile, 'r'):
191  cmd = 'ssh ' + evshost + ' "kill ' + pid + '; removerb ' + rbufname \
192  + "; removeshm " + shmname + '"'
193  print cmd
194  p = subprocess.Popen(cmd, shell=True)
195  p.wait()
196 
197 
198 # Run outputserver
199 
200 def run_outputserver(conffile):
201  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
202  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
203  port = get_rfgetconf(conffile, 'system', 'nsmport')
204  if not os.path.exists(basedir + '/collector'):
205  os.mkdir(basedir + '/collector')
206  cmd = 'ssh ' + opshost + ' "cd ' + basedir + '; export NSM2_PORT=' + port \
207  + '; rf_outputserver ' + get_configpath(conffile) \
208  + ' &> collector/nsmlog.log" '
209  print cmd
210  p = subprocess.Popen(cmd, shell=True)
211  time.sleep(1)
212 
213 
214 # Stop outputserver
215 
216 def stop_outputserver(conffile):
217  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
218  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
219  unit = get_rfgetconf(conffile, 'system', 'unitname')
220  rbufin = get_rfgetconf(conffile, 'collector', 'ringbufin')
221  rbufout = get_rfgetconf(conffile, 'collector', 'ringbufout')
222  rbufinname = unit + ':' + rbufin
223  rbufoutname = unit + ':' + rbufout
224  shmname = unit + ':collector'
225  p = subprocess.Popen('rfcommand ' + conffile + ' collector RF_UNCONFIGURE', shell=True)
226  p.wait()
227  pidfile = basedir + '/collector/pid.data'
228  for pid in open(pidfile, 'r'):
229  cmd = 'ssh ' + opshost + ' "kill ' + pid + '; removerb ' + rbufinname \
230  + '; removerb ' + rbufoutname + '; removeshm ' + shmname \
231  + '; clear_basf2_ipc"'
232  print cmd
233  p = subprocess.Popen(cmd, shell=True)
234  p.wait()
235 
236 
237 # Start event procesor
238 
239 def run_eventprocessor(conffile):
240  hostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
241  nodebase = get_rfgetconf(conffile, 'processor', 'nodebase')
242  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
243  port = get_rfgetconf(conffile, 'system', 'nsmport')
244  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
245  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
246  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
247  id = int(get_rfgetconf(conffile, 'processor', 'idbase'))
248 
249  for i in range(procid, procid + nnodes):
250  nodenum = '%2.2d' % i
251  nodeid = '%1.1d' % i # DESY only
252  if badlist.find(nodeid) == -1:
253  evphost = hostbase + nodeid
254  nodename = nodebase + nodenum
255  if not os.path.exists(basedir + '/evp_' + nodename):
256  os.mkdir(basedir + '/evp_' + nodename)
257  cmd = 'ssh ' + evphost + ' "cd ' + basedir + '; export NSM2_PORT=' \
258  + port + '; rf_eventprocessor ' + get_configpath(conffile) \
259  + ' &> evp_' + nodename + '/nsmlog.log" '
260  print cmd
261  p = subprocess.Popen(cmd, shell=True)
262  time.sleep(1)
263 
264 
265 # Stop event procesor
266 
267 def stop_eventprocessor(conffile):
268  hostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
269  nodebase = get_rfgetconf(conffile, 'processor', 'nodebase')
270  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
271  port = get_rfgetconf(conffile, 'system', 'nsmport')
272  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
273  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
274  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
275  id = int(get_rfgetconf(conffile, 'processor', 'idbase'))
276 
277  unit = get_rfgetconf(conffile, 'system', 'unitname')
278  rbufin = get_rfgetconf(conffile, 'collector', 'ringbufin')
279  rbufout = get_rfgetconf(conffile, 'collector', 'ringbufout')
280  rbufinname = unit + ':' + rbufin
281  rbufoutname = unit + ':' + rbufout
282 
283  for i in range(procid, procid + nnodes):
284  nodeid = '%2.2d' % i
285 # nodeid = '%1.1d' % i # DESY only
286  if badlist.find(nodeid) == -1:
287  evphost = hostbase + nodeid
288  nodename = 'evp_' + nodebase + nodeid
289  shmname = unit + ':' + nodename
290  print shmname
291  p = subprocess.Popen('rfcommand ' + conffile + ' ' + nodename +
292  ' RF_UNCONFIGURE', shell=True)
293  p.wait()
294  pidfile = basedir + '/' + nodename + '/pid.data'
295  for pid in open(pidfile, 'r'):
296  cmd = 'ssh ' + evphost + ' "kill ' + pid + '; removerb ' \
297  + rbufinname + '; removerb ' + rbufoutname \
298  + '; removeshm ' + shmname + '; clear_basf2_ipc"'
299 # + '; removeshm ' + '"'
300 # + '; removeshm ' + shmname + '"'
301  print cmd
302  p = subprocess.Popen(cmd, shell=True)
303  p.wait()
304 
305 
306 # Run dqmserver
307 
308 def run_dqmserver(conffile):
309  dqmhost = get_rfgetconf(conffile, 'dqmserver', 'ctlhost')
310  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
311  port = get_rfgetconf(conffile, 'system', 'nsmport')
312  if not os.path.exists(basedir + '/dqmserver'):
313  os.mkdir(basedir + '/dqmserver')
314  cmd = 'ssh ' + dqmhost + ' "cd ' + basedir + '; export NSM2_PORT=' + port \
315  + '; rf_dqmserver ' + get_configpath(conffile) \
316  + ' &> dqmserver/nsmlog.log" '
317  print cmd
318  p = subprocess.Popen(cmd, shell=True)
319  time.sleep(1)
320 
321 
322 # Stop dqmserver
323 
324 def stop_dqmserver(conffile):
325  dqmhost = get_rfgetconf(conffile, 'dqmserver', 'ctlhost')
326  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
327  p = subprocess.Popen('rfcommand ' + conffile + ' dqmserver RF_UNCONFIGURE', shell=True)
328  p.wait()
329  pidfile = basedir + '/dqmserver/pid.data'
330  for pid in open(pidfile, 'r'):
331  cmd = 'ssh ' + dqmhost + ' "kill ' + pid + '"'
332  print cmd
333  p = subprocess.Popen(cmd, shell=True)
334  p.wait()
335 
336 
337 # Run roisender
338 
339 def run_roisender(conffile):
340  roihost = get_rfgetconf(conffile, 'roisender', 'ctlhost')
341  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
342  port = get_rfgetconf(conffile, 'system', 'nsmport')
343  if not os.path.exists(basedir + '/roisender'):
344  os.mkdir(basedir + '/roisender')
345  cmd = 'ssh ' + roihost + ' "cd ' + basedir + '; export NSM2_PORT=' + port \
346  + '; rf_roisender ' + get_configpath(conffile) \
347  + ' &> roisender/nsmlog.log" '
348  print cmd
349  p = subprocess.Popen(cmd, shell=True)
350  time.sleep(1)
351 
352 
353 # Stop roisender
354 
355 def stop_roisender(conffile):
356  roihost = get_rfgetconf(conffile, 'roisender', 'ctlhost')
357  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
358  unit = get_rfgetconf(conffile, 'system', 'unitname')
359  shmname = unit + ':roisender'
360  p = subprocess.Popen('rfcommand ' + conffile + ' roisender RF_UNCONFIGURE', shell=True)
361  p.wait()
362  pidfile = basedir + '/roisender/pid.data'
363  for pid in open(pidfile, 'r'):
364  cmd = 'ssh ' + roihost + ' "kill ' + pid + '; removeshm ' + shmname + '"'
365  print cmd
366  p = subprocess.Popen(cmd, shell=True)
367  p.wait()
368 
369 
370 # Run local master
371 
372 def run_master(conffile):
373  masterhost = get_rfgetconf(conffile, 'master', 'ctlhost')
374  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
375  port = get_rfgetconf(conffile, 'system', 'nsmport')
376  if not os.path.exists(basedir + '/master'):
377  os.mkdir(basedir + '/master')
378  cmd = 'ssh ' + masterhost + ' "cd ' + basedir + '; export NSM2_PORT=' \
379  + port + '; rf_master_local ' + get_configpath(conffile) \
380  + ' &> master/nsmlog.log" '
381  print cmd
382  p = subprocess.Popen(cmd, shell=True)
383  time.sleep(1)
384 
385 
386 # Stop local master
387 
388 def stop_master(conffile):
389  masterhost = get_rfgetconf(conffile, 'master', 'ctlhost')
390  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
391 # p = subprocess.Popen ( "rfcommand " + conffile + " master RF_UNCONFIGURE", shell=True );
392 # p.wait();
393  pidfile = basedir + '/master/pid.data'
394  for pid in open(pidfile, 'r'):
395  cmd = 'ssh ' + masterhost + ' "kill ' + pid + '"'
396  print cmd
397  p = subprocess.Popen(cmd, shell=True)
398  p.wait()
399 
400 
401 def start_rfarm_components(conffile):
402  run_eventprocessor(conffile)
403  run_outputserver(conffile)
404  run_eventserver(conffile)
405  run_dqmserver(conffile)
406  run_roisender(conffile)
407 
408 
409 # Stop RFARM components
410 
411 def stop_rfarm_components(conffile):
412  stop_roisender(conffile)
413  stop_dqmserver(conffile)
414  stop_eventserver(conffile)
415  stop_outputserver(conffile)
416  stop_eventprocessor(conffile)
417 
418 
419 # Start RFARM local operation
420 
421 def start_rfarm_local(conffile):
422  start_rfarm_components(conffile)
423  run_master(conffile)
424 
425 
426 # Stop RFARM local operation
427 
428 def stop_rfarm_local(conffile):
429  # stop_eventprocessor(conffile)
430  stop_rfarm_components(conffile)
431  stop_master(conffile)