Belle II Software  release-08-01-10
rfarmutil_bash.py
1 #!/usr/bin/env python
2 # DESY-TB special Nov.8, 2016, R.Itoh
3 # -*- coding: utf-8 -*-
4 
5 
12 
13 import os
14 import sys
15 import subprocess
16 import time
17 
18 
19 # Basic Utilities
20 # Get full path of the configuration file
21 
22 def get_configpath(conffile):
23  confdir = str(os.environ.get('RFARM_CONFDIR'))
24  if confdir == 'None':
25  print('RFARM_CONFDIR is not defined. Exit.')
26  sys.exit()
27  cmd = confdir + '/' + conffile + '.conf'
28  return cmd
29 
30 
31 # Get Configuration from config file
32 def get_rfgetconf(conffile, item1, item2='NULL', item3='NULL'):
33 
34  confdir = str(os.environ.get('RFARM_CONFDIR'))
35  if confdir == 'None':
36  print('RFARM_CONFDIR is not defined. Exit.')
37  sys.exit()
38  cmd = 'rfgetconf ' + get_configpath(conffile) + ' ' + item1 + ' ' + item2 \
39  + ' ' + item3
40  p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
41  stderr=subprocess.PIPE)
42  p.wait()
43  output = p.stdout.read()
44 # print(output)
45  return output
46 
47 
48 # print("waiting")
49 # confout = p.stdout.read()
50 
51 # NSMD related utilities
52 # Run NSMD
53 
54 def run_nsmd(nsmdir, port, nsmhost):
55  # Check directory for loggin
56  if not os.path.exists(nsmdir + '/' + nsmhost):
57  os.mkdir(nsmdir + '/' + nsmhost)
58  nsmd = str(os.environ.get('BELLE2_EXTERNALS_BIN')) + '/nsmd2 -f -p ' \
59  + port + ' -s ' + port + ' -h '
60  cmd = 'ssh ' + nsmhost + ' "cd ' + nsmdir + '/' + nsmhost \
61  + '; export NSMLOGDIR=' + nsmdir + '/' + nsmhost + ';' + nsmd \
62  + nsmhost + '"'
63  print(cmd)
64  subprocess.Popen(cmd, shell=True)
65  time.sleep(1)
66 
67 
68 # p.wait()
69 
70 # Kill NSMD
71 
72 def kill_nsmd(port, nsmhost):
73  cmd = 'ssh ' + nsmhost + ' "ps -fC nsmd2 | grep ' + port \
74  + "| awk '{print \\$2}' \" > temp.pid"
75 # cmd = "ssh -v " + nsmhost + " \"ps -fC nsmd2 | grep " + port + "| awk '{printf(\"klll \%d\", \$2)} | sh' \""
76 # print(cmd)
77  p = subprocess.Popen(cmd, shell=True)
78  p.wait()
79  for line in open('temp.pid', 'r'):
80  pid = int(line)
81  if pid > 0:
82  cmd = 'ssh ' + nsmhost + ' "kill ' + str(pid) + '"'
83 # print(cmd)
84  p = subprocess.Popen(cmd, shell=True)
85  p.wait()
86 
87 
88 # Start NSMD on all nodes
89 
90 def start_nsmd(conffile):
91  # Global parameters
92  nsmdir = get_rfgetconf(conffile, 'system', 'nsmdir_base')
93  port = get_rfgetconf(conffile, 'system', 'nsmport')
94 
95  # Run nsmd on control node
96  ctlhost = get_rfgetconf(conffile, 'master', 'ctlhost')
97  run_nsmd(nsmdir, port, ctlhost)
98  print('nsmd on %s started' % ctlhost)
99 
100  # Run nsmd on event server node
101  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
102  if ctlhost.find(evshost) == -1:
103  run_nsmd(nsmdir, port, evshost)
104  print('nsmd on %s started' % evshost)
105 
106  # Run nsmd on output server node
107  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
108  run_nsmd(nsmdir, port, opshost)
109  print('nsmd on %s started' % opshost)
110 
111  # Run nsmd on event processor nodes
112  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
113  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
114  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
115  evphostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
116  for i in range(procid, procid + nnodes):
117  # nodeid = '%2.2d' % i
118  nodeid = '%1.1d' % i # DESY only
119  if badlist.find(nodeid) == -1:
120  evphost = evphostbase + nodeid
121  run_nsmd(nsmdir, port, evphost)
122  print('nsmd on %s started' % evphost)
123 
124 
125 def stop_nsmd(conffile):
126  port = get_rfgetconf(conffile, 'system', 'nsmport')
127 
128  # Kill nsmd on control node
129  ctlhost = get_rfgetconf(conffile, 'master', 'ctlhost')
130  kill_nsmd(port, ctlhost)
131  print('nsmd on %s stopped' % ctlhost)
132 
133  # Run nsmd on event server node
134  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
135  if ctlhost.find(evshost) == -1:
136  kill_nsmd(port, evshost)
137  print('nsmd on %s stopped' % evshost)
138 
139  # Run nsmd on output server node
140  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
141  kill_nsmd(port, opshost)
142  print('nsmd on %s stopped' % opshost)
143 
144  # Run nsmd on event processor nodes
145  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
146  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
147  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
148  evphostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
149  for i in range(procid, procid + nnodes):
150  # nodeid = '%2.2d' % i
151  nodeid = '%1.1d' % i # DESY only
152  if badlist.find(nodeid) == -1:
153  evphost = evphostbase + nodeid
154  kill_nsmd(port, evphost)
155  print('nsmd on %s stopped' % evphost)
156 
157 
158 # RFARM server operations
159 # Run eventserver
160 
161 def run_eventserver(conffile):
162  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
163  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
164  port = get_rfgetconf(conffile, 'system', 'nsmport')
165  if not os.path.exists(basedir + '/distributor'):
166  os.mkdir(basedir + '/distributor')
167  cmd = 'ssh ' + evshost + ' "cd ' + basedir + '; export NSM2_PORT=' + port \
168  + '; rf_eventserver ' + get_configpath(conffile) \
169  + ' &> distributor/nsmlog.log" '
170  print(cmd)
171  subprocess.Popen(cmd, shell=True)
172  time.sleep(1)
173 
174 
175 # Stop eventserver
176 
177 def stop_eventserver(conffile):
178  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
179  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
180  unit = get_rfgetconf(conffile, 'system', 'unitname')
181  ringbuf = get_rfgetconf(conffile, 'distributor', 'ringbuffer')
182  rbufname = unit + ':' + ringbuf
183  shmname = unit + ':distributor'
184  p = subprocess.Popen('rfcommand ' + conffile +
185  ' distributor RF_UNCONFIGURE', shell=True)
186  p.wait()
187  pidfile = basedir + '/distributor/pid.data'
188  for pid in open(pidfile, 'r'):
189  cmd = 'ssh ' + evshost + ' "kill ' + pid + '; removerb ' + rbufname \
190  + "; removeshm " + shmname + '"'
191  print(cmd)
192  p = subprocess.Popen(cmd, shell=True)
193  p.wait()
194 
195 
196 # Run outputserver
197 
198 def run_outputserver(conffile):
199  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
200  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
201  port = get_rfgetconf(conffile, 'system', 'nsmport')
202  if not os.path.exists(basedir + '/collector'):
203  os.mkdir(basedir + '/collector')
204  cmd = 'ssh ' + opshost + ' "cd ' + basedir + '; export NSM2_PORT=' + port \
205  + '; rf_outputserver ' + get_configpath(conffile) \
206  + ' &> collector/nsmlog.log" '
207  print(cmd)
208  subprocess.Popen(cmd, shell=True)
209  time.sleep(1)
210 
211 
212 # Stop outputserver
213 
214 def stop_outputserver(conffile):
215  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
216  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
217  unit = get_rfgetconf(conffile, 'system', 'unitname')
218  rbufin = get_rfgetconf(conffile, 'collector', 'ringbufin')
219  rbufout = get_rfgetconf(conffile, 'collector', 'ringbufout')
220  rbufinname = unit + ':' + rbufin
221  rbufoutname = unit + ':' + rbufout
222  shmname = unit + ':collector'
223  p = subprocess.Popen('rfcommand ' + conffile + ' collector RF_UNCONFIGURE', shell=True)
224  p.wait()
225  pidfile = basedir + '/collector/pid.data'
226  for pid in open(pidfile, 'r'):
227  cmd = 'ssh ' + opshost + ' "kill ' + pid + '; removerb ' + rbufinname \
228  + '; removerb ' + rbufoutname + '; removeshm ' + shmname \
229  + '; clear_basf2_ipc"'
230  print(cmd)
231  p = subprocess.Popen(cmd, shell=True)
232  p.wait()
233 
234 
235 # Start event procesor
236 
237 def run_eventprocessor(conffile):
238  hostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
239  nodebase = get_rfgetconf(conffile, 'processor', 'nodebase')
240  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
241  port = get_rfgetconf(conffile, 'system', 'nsmport')
242  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
243  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
244  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
245  id = int(get_rfgetconf(conffile, 'processor', 'idbase')) # noqa
246 
247  for i in range(procid, procid + nnodes):
248  nodenum = '%2.2d' % i
249  nodeid = '%1.1d' % i # DESY only
250  if badlist.find(nodeid) == -1:
251  evphost = hostbase + nodeid
252  nodename = nodebase + nodenum
253  if not os.path.exists(basedir + '/evp_' + nodename):
254  os.mkdir(basedir + '/evp_' + nodename)
255  cmd = 'ssh ' + evphost + ' "cd ' + basedir + '; export NSM2_PORT=' \
256  + port + '; rf_eventprocessor ' + get_configpath(conffile) \
257  + ' &> evp_' + nodename + '/nsmlog.log" '
258  print(cmd)
259  subprocess.Popen(cmd, shell=True)
260  time.sleep(1)
261 
262 
263 # Stop event procesor
264 
265 def stop_eventprocessor(conffile):
266  hostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
267  nodebase = get_rfgetconf(conffile, 'processor', 'nodebase')
268  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
269  port = get_rfgetconf(conffile, 'system', 'nsmport') # noqa
270  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
271  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
272  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
273  id = int(get_rfgetconf(conffile, 'processor', 'idbase')) # noqa
274 
275  unit = get_rfgetconf(conffile, 'system', 'unitname')
276  rbufin = get_rfgetconf(conffile, 'collector', 'ringbufin')
277  rbufout = get_rfgetconf(conffile, 'collector', 'ringbufout')
278  rbufinname = unit + ':' + rbufin
279  rbufoutname = unit + ':' + rbufout
280 
281  for i in range(procid, procid + nnodes):
282  nodeid = '%2.2d' % i
283 # nodeid = '%1.1d' % i # DESY only
284  if badlist.find(nodeid) == -1:
285  evphost = hostbase + nodeid
286  nodename = 'evp_' + nodebase + nodeid
287  shmname = unit + ':' + nodename
288  print(shmname)
289  p = subprocess.Popen('rfcommand ' + conffile + ' ' + nodename +
290  ' RF_UNCONFIGURE', shell=True)
291  p.wait()
292  pidfile = basedir + '/' + nodename + '/pid.data'
293  for pid in open(pidfile, 'r'):
294  cmd = 'ssh ' + evphost + ' "kill ' + pid + '; removerb ' \
295  + rbufinname + '; removerb ' + rbufoutname \
296  + '; removeshm ' + shmname + '; clear_basf2_ipc"'
297 # + '; removeshm ' + '"'
298 # + '; removeshm ' + shmname + '"'
299  print(cmd)
300  p = subprocess.Popen(cmd, shell=True)
301  p.wait()
302 
303 
304 # Run dqmserver
305 
306 def run_dqmserver(conffile):
307  dqmhost = get_rfgetconf(conffile, 'dqmserver', 'ctlhost')
308  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
309  port = get_rfgetconf(conffile, 'system', 'nsmport')
310  if not os.path.exists(basedir + '/dqmserver'):
311  os.mkdir(basedir + '/dqmserver')
312  cmd = 'ssh ' + dqmhost + ' "cd ' + basedir + '; export NSM2_PORT=' + port \
313  + '; rf_dqmserver ' + get_configpath(conffile) \
314  + ' &> dqmserver/nsmlog.log" '
315  print(cmd)
316  subprocess.Popen(cmd, shell=True)
317  time.sleep(1)
318 
319 
320 # Stop dqmserver
321 
322 def stop_dqmserver(conffile):
323  dqmhost = get_rfgetconf(conffile, 'dqmserver', 'ctlhost')
324  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
325  p = subprocess.Popen('rfcommand ' + conffile + ' dqmserver RF_UNCONFIGURE', shell=True)
326  p.wait()
327  pidfile = basedir + '/dqmserver/pid.data'
328  for pid in open(pidfile, 'r'):
329  cmd = 'ssh ' + dqmhost + ' "kill ' + pid + '"'
330  print(cmd)
331  p = subprocess.Popen(cmd, shell=True)
332  p.wait()
333 
334 
335 # Run roisender
336 
337 def run_roisender(conffile):
338  roihost = get_rfgetconf(conffile, 'roisender', 'ctlhost')
339  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
340  port = get_rfgetconf(conffile, 'system', 'nsmport')
341  if not os.path.exists(basedir + '/roisender'):
342  os.mkdir(basedir + '/roisender')
343  cmd = 'ssh ' + roihost + ' "cd ' + basedir + '; export NSM2_PORT=' + port \
344  + '; rf_roisender ' + get_configpath(conffile) \
345  + ' &> roisender/nsmlog.log" '
346  print(cmd)
347  subprocess.Popen(cmd, shell=True)
348  time.sleep(1)
349 
350 
351 # Stop roisender
352 
353 def stop_roisender(conffile):
354  roihost = get_rfgetconf(conffile, 'roisender', 'ctlhost')
355  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
356  unit = get_rfgetconf(conffile, 'system', 'unitname')
357  shmname = unit + ':roisender'
358  p = subprocess.Popen('rfcommand ' + conffile + ' roisender RF_UNCONFIGURE', shell=True)
359  p.wait()
360  pidfile = basedir + '/roisender/pid.data'
361  for pid in open(pidfile, 'r'):
362  cmd = 'ssh ' + roihost + ' "kill ' + pid + '; removeshm ' + shmname + '"'
363  print(cmd)
364  p = subprocess.Popen(cmd, shell=True)
365  p.wait()
366 
367 
368 # Run local master
369 
370 def run_master(conffile):
371  masterhost = get_rfgetconf(conffile, 'master', 'ctlhost')
372  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
373  port = get_rfgetconf(conffile, 'system', 'nsmport')
374  if not os.path.exists(basedir + '/master'):
375  os.mkdir(basedir + '/master')
376  cmd = 'ssh ' + masterhost + ' "cd ' + basedir + '; export NSM2_PORT=' \
377  + port + '; rf_master_local ' + get_configpath(conffile) \
378  + ' &> master/nsmlog.log" '
379  print(cmd)
380  subprocess.Popen(cmd, shell=True)
381  time.sleep(1)
382 
383 
384 # Stop local master
385 
386 def stop_master(conffile):
387  masterhost = get_rfgetconf(conffile, 'master', 'ctlhost')
388  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
389 # p = subprocess.Popen ( "rfcommand " + conffile + " master RF_UNCONFIGURE", shell=True );
390 # p.wait();
391  pidfile = basedir + '/master/pid.data'
392  for pid in open(pidfile, 'r'):
393  cmd = 'ssh ' + masterhost + ' "kill ' + pid + '"'
394  print(cmd)
395  p = subprocess.Popen(cmd, shell=True)
396  p.wait()
397 
398 
399 def start_rfarm_components(conffile):
400  run_eventprocessor(conffile)
401  run_outputserver(conffile)
402  run_eventserver(conffile)
403  run_dqmserver(conffile)
404  run_roisender(conffile)
405 
406 
407 # Stop RFARM components
408 
409 def stop_rfarm_components(conffile):
410  stop_roisender(conffile)
411  stop_dqmserver(conffile)
412  stop_eventserver(conffile)
413  stop_outputserver(conffile)
414  stop_eventprocessor(conffile)
415 
416 
417 # Start RFARM local operation
418 
419 def start_rfarm_local(conffile):
420  start_rfarm_components(conffile)
421  run_master(conffile)
422 
423 
424 # Stop RFARM local operation
425 
426 def stop_rfarm_local(conffile):
427  # stop_eventprocessor(conffile)
428  stop_rfarm_components(conffile)
429  stop_master(conffile)