Belle II Software  release-05-01-25
rfarmutil.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 
4 import os
5 import sys
6 import subprocess
7 import signal
8 import socket
9 import time
10 
11 
12 # Basic Utilities
13 # Get full path of the configuration file
14 
15 def get_configpath(conffile):
16  confdir = str(os.environ.get('RFARM_CONFDIR'))
17  if confdir == 'None':
18  print 'RFARM_CONFDIR is not defined. Exit.'
19  sys.exit()
20  cmd = confdir + '/' + conffile + '.conf'
21  return cmd
22 
23 
24 # Get Configuration from config file
25 def get_rfgetconf(conffile, item1, item2='NULL', item3='NULL'):
26 
27  confdir = str(os.environ.get('RFARM_CONFDIR'))
28  if confdir == 'None':
29  print 'RFARM_CONFDIR is not defined. Exit.'
30  sys.exit()
31  cmd = 'rfgetconf ' + get_configpath(conffile) + ' ' + item1 + ' ' + item2 \
32  + ' ' + item3
33  p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
34  stderr=subprocess.PIPE)
35  p.wait()
36  output = p.stdout.read()
37 # print "getconf = ", output
38  return output
39 
40 
41 # print "waiting"
42 # confout = p.stdout.read()
43 
44 # NSMD related utilities
45 # Run NSMD
46 
47 def run_nsmd(nsmdir, port, nsmhost):
48  # Check directory for loggin
49  if not os.path.exists(nsmdir + '/' + nsmhost):
50  os.mkdir(nsmdir + '/' + nsmhost)
51 # nsmd = str(os.environ.get('BELLE2_EXTERNALS_BIN')) + '/nsmd2 -f -p ' \
52  nsmd = 'nsmd2 -f -p ' \
53  + port + ' -s ' + port + ' -h '
54  cmd = 'ssh ' + nsmhost + ' "cd ' + nsmdir + '/' + nsmhost \
55  + '; setenv NSMLOGDIR ' + nsmdir + '/' + nsmhost + ';' + nsmd \
56  + nsmhost + '"'
57 # print cmd
58  p = subprocess.Popen(cmd, shell=True)
59  time.sleep(1)
60 
61 
62 # p.wait()
63 
64 # Kill NSMD
65 
66 def kill_nsmd(port, nsmhost):
67  cmd = 'ssh ' + nsmhost + ' "ps -fC nsmd2 | grep ' + port \
68  + "| awk '{print \$2}' \" > temp.pid"
69 # cmd = "ssh -v " + nsmhost + " \"ps -fC nsmd2 | grep " + port + "| awk '{printf(\"klll \%d\", \$2)} | sh' \""
70 # print cmd
71  p = subprocess.Popen(cmd, shell=True)
72  p.wait()
73  for line in open('temp.pid', 'r'):
74  pid = int(line)
75  if pid > 0:
76  cmd = 'ssh ' + nsmhost + ' "kill ' + str(pid) + '"'
77 # print cmd
78  p = subprocess.Popen(cmd, shell=True)
79  p.wait()
80 
81 
82 # Start NSMD on all nodes
83 
84 def start_nsmd(conffile):
85  # Global parameters
86  nsmdir = get_rfgetconf(conffile, 'system', 'nsmdir_base')
87  port = get_rfgetconf(conffile, 'system', 'nsmport')
88 
89  # Run nsmd on control node
90  ctlhost = get_rfgetconf(conffile, 'master', 'ctlhost')
91  run_nsmd(nsmdir, port, ctlhost)
92  print 'nsmd on %s started' % ctlhost
93 
94  # Run nsmd on event server node
95  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
96  if ctlhost.find(evshost) == -1:
97  run_nsmd(nsmdir, port, evshost)
98  print 'nsmd on %s started' % evshost
99 
100  # Run nsmd on output server node
101  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
102  run_nsmd(nsmdir, port, opshost)
103  print 'nsmd on %s started' % opshost
104 
105  # Run nsmd on event processor nodes
106  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
107  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
108  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
109  evphostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
110  for i in range(procid, procid + nnodes):
111  nodeid = '%2.2d' % i
112  if badlist.find(nodeid) == -1:
113  evphost = evphostbase + nodeid
114  run_nsmd(nsmdir, port, evphost)
115  print 'nsmd on %s started' % evphost
116 
117 
118 def stop_nsmd(conffile):
119  port = get_rfgetconf(conffile, 'system', 'nsmport')
120 
121  # Kill nsmd on control node
122  ctlhost = get_rfgetconf(conffile, 'master', 'ctlhost')
123  kill_nsmd(port, ctlhost)
124  print 'nsmd on %s stopped' % ctlhost
125 
126  # Run nsmd on event server node
127  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
128  if ctlhost.find(evshost) == -1:
129  kill_nsmd(port, evshost)
130  print 'nsmd on %s stopped' % evshost
131 
132  # Run nsmd on output server node
133  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
134  kill_nsmd(port, opshost)
135  print 'nsmd on %s stopped' % opshost
136 
137  # Run nsmd on event processor nodes
138  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
139  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
140  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
141  evphostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
142  for i in range(procid, procid + nnodes):
143  nodeid = '%2.2d' % i
144  if badlist.find(nodeid) == -1:
145  evphost = evphostbase + nodeid
146  kill_nsmd(port, evphost)
147  print 'nsmd on %s stopped' % evphost
148 
149 
150 # RFARM server operations
151 # Run eventserver
152 
153 def run_eventserver(conffile):
154  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
155  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
156  port = get_rfgetconf(conffile, 'system', 'nsmport')
157  if not os.path.exists(basedir + '/distributor'):
158  os.mkdir(basedir + '/distributor')
159  cmd = 'ssh ' + evshost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
160  + '; rf_eventserver ' + get_configpath(conffile) \
161  + ' > & distributor/nsmlog.log" '
162  print cmd
163  p = subprocess.Popen(cmd, shell=True)
164  time.sleep(1)
165 
166 
167 # Stop eventserver
168 
169 def stop_eventserver(conffile):
170  evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
171  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
172  unit = get_rfgetconf(conffile, 'system', 'unitname')
173  ringbuf = get_rfgetconf(conffile, 'distributor', 'ringbuffer')
174  rbufname = unit + ':' + ringbuf
175  shmname = unit + ':distributor'
176 # p = subprocess.Popen('rfcommand ' + conffile +
177 # ' distributor RC_ABORT', shell=True)
178 # p.wait()
179  pidfile = basedir + '/distributor/pid.data'
180  for pid in open(pidfile, 'r'):
181  cmd = 'ssh ' + evshost + ' "kill ' + pid + '; removerb ' + rbufname \
182  + "; removeshm " + shmname + '"'
183  print cmd
184  p = subprocess.Popen(cmd, shell=True)
185  p.wait()
186 
187 
188 # Run outputserver
189 
190 def run_outputserver(conffile):
191  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
192  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
193  port = get_rfgetconf(conffile, 'system', 'nsmport')
194  if not os.path.exists(basedir + '/collector'):
195  os.mkdir(basedir + '/collector')
196  cmd = 'ssh ' + opshost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
197  + '; rf_outputserver ' + get_configpath(conffile) \
198  + ' > & collector/nsmlog.log" '
199  print cmd
200  p = subprocess.Popen(cmd, shell=True)
201  time.sleep(1)
202 
203 
204 # Stop outputserver
205 
206 def stop_outputserver(conffile):
207  opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
208  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
209  unit = get_rfgetconf(conffile, 'system', 'unitname')
210  rbufin = get_rfgetconf(conffile, 'collector', 'ringbufin')
211  rbufout = get_rfgetconf(conffile, 'collector', 'ringbufout')
212  rbufinname = unit + ':' + rbufin
213  rbufoutname = unit + ':' + rbufout
214  shmname = unit + ':collector'
215 # p = subprocess.Popen('rfcommand ' + conffile + ' collector RC_ABORT', shell=True)
216 # p.wait()
217  pidfile = basedir + '/collector/pid.data'
218  for pid in open(pidfile, 'r'):
219  cmd = 'ssh ' + opshost + ' "kill ' + pid + '; removerb ' + rbufinname \
220  + '; removerb ' + rbufoutname + '; removeshm ' + shmname \
221  + '; clear_basf2_ipc"'
222  print cmd
223  p = subprocess.Popen(cmd, shell=True)
224  p.wait()
225 
226 
227 # Start event procesor
228 
229 def run_eventprocessor(conffile):
230  hostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
231  nodebase = get_rfgetconf(conffile, 'processor', 'nodebase')
232  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
233  port = get_rfgetconf(conffile, 'system', 'nsmport')
234  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
235  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
236  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
237  id = int(get_rfgetconf(conffile, 'processor', 'idbase'))
238 
239  for i in range(procid, procid + nnodes):
240  nodeid = '%2.2d' % i
241  if badlist.find(nodeid) == -1:
242  evphost = hostbase + nodeid
243  nodename = nodebase + nodeid
244  if not os.path.exists(basedir + '/evp_' + nodename):
245  os.mkdir(basedir + '/evp_' + nodename)
246  cmd = 'ssh ' + evphost + ' "cd ' + basedir + '; setenv NSM2_PORT ' \
247  + port + '; rf_eventprocessor ' + get_configpath(conffile) \
248  + ' > & evp_' + nodename + '/nsmlog.log" '
249  print cmd
250  p = subprocess.Popen(cmd, shell=True)
251  time.sleep(1)
252 
253 
254 # Stop event procesor
255 
256 def stop_eventprocessor(conffile):
257  hostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
258  nodebase = get_rfgetconf(conffile, 'processor', 'nodebase')
259  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
260  port = get_rfgetconf(conffile, 'system', 'nsmport')
261  nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
262  procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
263  badlist = get_rfgetconf(conffile, 'processor', 'badlist')
264  id = int(get_rfgetconf(conffile, 'processor', 'idbase'))
265 
266  unit = get_rfgetconf(conffile, 'system', 'unitname')
267  rbufin = get_rfgetconf(conffile, 'collector', 'ringbufin')
268  rbufout = get_rfgetconf(conffile, 'collector', 'ringbufout')
269  rbufinname = unit + ':' + rbufin
270  rbufoutname = unit + ':' + rbufout
271 
272  for i in range(procid, procid + nnodes):
273  nodeid = '%2.2d' % i
274  if badlist.find(nodeid) == -1:
275  evphost = hostbase + nodeid
276  nodename = 'evp_' + nodebase + nodeid
277  shmname = unit + ':' + nodename
278  print shmname
279 # p = subprocess.Popen('rfcommand ' + conffile + ' ' + nodename +
280 # ' RC_ABORT', shell=True)
281 # p.wait()
282  pidfile = basedir + '/' + nodename + '/pid.data'
283  for pid in open(pidfile, 'r'):
284  cmd = 'ssh ' + evphost + ' "kill ' + pid + '; removerb ' \
285  + rbufinname + '; removerb ' + rbufoutname \
286  + '; removeshm ' + shmname + '; clear_basf2_ipc"'
287 # + '; removeshm ' + '"'
288 # + '; removeshm ' + shmname + '"'
289  print cmd
290  p = subprocess.Popen(cmd, shell=True)
291  p.wait()
292 
293 
294 # Run dqmserver
295 
296 def run_dqmserver(conffile):
297  dqmhost = get_rfgetconf(conffile, 'dqmserver', 'ctlhost')
298  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
299  port = get_rfgetconf(conffile, 'system', 'nsmport')
300  if not os.path.exists(basedir + '/dqmserver'):
301  os.mkdir(basedir + '/dqmserver')
302  cmd = 'ssh ' + dqmhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
303  + '; rf_dqmserver ' + get_configpath(conffile) \
304  + ' > & dqmserver/nsmlog.log" '
305  print cmd
306  p = subprocess.Popen(cmd, shell=True)
307  time.sleep(1)
308 
309 
310 # Stop dqmserver
311 
312 def stop_dqmserver(conffile):
313  dqmhost = get_rfgetconf(conffile, 'dqmserver', 'ctlhost')
314  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
315 # p = subprocess.Popen('rfcommand ' + conffile + ' dqmserver RC_ABORT', shell=True)
316 # p.wait()
317  pidfile = basedir + '/dqmserver/pid.data'
318  for pid in open(pidfile, 'r'):
319  cmd = 'ssh ' + dqmhost + ' "kill ' + pid + '"'
320  print cmd
321  p = subprocess.Popen(cmd, shell=True)
322  p.wait()
323 
324 
325 # Run roisender
326 
327 def run_roisender(conffile):
328  roihost = get_rfgetconf(conffile, 'roisender', 'ctlhost')
329  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
330  port = get_rfgetconf(conffile, 'system', 'nsmport')
331  if not os.path.exists(basedir + '/roisender'):
332  os.mkdir(basedir + '/roisender')
333  cmd = 'ssh ' + roihost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
334  + '; rf_roisender ' + get_configpath(conffile) \
335  + ' > & roisender/nsmlog.log" '
336  print cmd
337  p = subprocess.Popen(cmd, shell=True)
338  time.sleep(1)
339 
340 
341 # Stop roisender
342 
343 def stop_roisender(conffile):
344  roihost = get_rfgetconf(conffile, 'roisender', 'ctlhost')
345  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
346  unit = get_rfgetconf(conffile, 'system', 'unitname')
347  shmname = unit + ':roisender'
348 # p = subprocess.Popen('rfcommand ' + conffile + ' roisender RC_ABORT', shell=True)
349 # p.wait()
350  pidfile = basedir + '/roisender/pid.data'
351  for pid in open(pidfile, 'r'):
352  cmd = 'ssh ' + roihost + ' "kill ' + pid + '; removeshm ' + shmname + '"'
353  print cmd
354  p = subprocess.Popen(cmd, shell=True)
355  p.wait()
356 
357 
358 # Run local master
359 
360 def run_master(conffile):
361  masterhost = get_rfgetconf(conffile, 'master', 'ctlhost')
362  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
363  port = get_rfgetconf(conffile, 'system', 'nsmport')
364  if not os.path.exists(basedir + '/master'):
365  os.mkdir(basedir + '/master')
366  cmd = 'ssh ' + masterhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' \
367  + port + '; rf_master_local ' + get_configpath(conffile) \
368  + ' > & master/nsmlog.log" '
369  print cmd
370  p = subprocess.Popen(cmd, shell=True)
371  time.sleep(1)
372 
373 
374 # Stop local master
375 
376 def stop_master(conffile):
377  masterhost = get_rfgetconf(conffile, 'master', 'ctlhost')
378  basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
379 # p = subprocess.Popen ( "rfcommand " + conffile + " master RF_UNCONFIGURE", shell=True );
380 # p.wait();
381  pidfile = basedir + '/master/pid.data'
382  for pid in open(pidfile, 'r'):
383  cmd = 'ssh ' + masterhost + ' "kill ' + pid + '"'
384  print cmd
385  p = subprocess.Popen(cmd, shell=True)
386  p.wait()
387 
388 
389 def start_rfarm_components(conffile):
390  run_eventprocessor(conffile)
391  run_outputserver(conffile)
392  run_eventserver(conffile)
393  run_dqmserver(conffile)
394  run_roisender(conffile)
395 
396 
397 # Stop RFARM components
398 
399 def stop_rfarm_components(conffile):
400  stop_roisender(conffile)
401  print("stop dqmserver")
402  stop_dqmserver(conffile)
403  print("done")
404  stop_eventserver(conffile)
405  stop_outputserver(conffile)
406 # stop_eventprocessor(conffile)
407 
408 
409 # Start RFARM local operation
410 
411 def start_rfarm_local(conffile):
412  start_rfarm_components(conffile)
413  run_master(conffile)
414 
415 
416 # Stop RFARM local operation
417 
418 def stop_rfarm_local(conffile):
419  print("stopping eventprocessors")
420  stop_eventprocessor(conffile)
421  print("stopping rfarm components")
422  stop_rfarm_components(conffile)
423  print("stopping master")
424  stop_master(conffile)