Belle II Software  release-06-02-00
erecoutil.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 
4 
11 
12 import os
13 import sys
14 import subprocess
15 import signal
16 import socket
17 import time
18 
19 
20 # Basic Utilities
21 # Get full path of the configuration file
22 
23 def get_configpath(conffile):
24  confdir = str(os.environ.get('ERECO_CONFDIR'))
25  if confdir == 'None':
26  print 'ERECO_CONFDIR is not defined. Exit.'
27  sys.exit()
28  cmd = confdir + '/' + conffile + '.conf'
29  return cmd
30 
31 
32 # Get Configuration from config file
33 def get_ergetconf(conffile, item1, item2='NULL', item3='NULL'):
34 
35  confdir = str(os.environ.get('ERECO_CONFDIR'))
36  if confdir == 'None':
37  print 'ERECO_CONFDIR is not defined. Exit.'
38  sys.exit()
39  cmd = 'rfgetconf ' + get_configpath(conffile) + ' ' + item1 + ' ' + item2 \
40  + ' ' + item3
41  p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
42  stderr=subprocess.PIPE)
43  p.wait()
44  output = p.stdout.read()
45 # print output
46  return output
47 
48 
49 # print "waiting"
50 # confout = p.stdout.read()
51 
52 # NSMD related utilities
53 # Run NSMD
54 
55 def run_nsmd(nsmdir, port, nsmhost):
56  # Check directory for loggin
57  if not os.path.exists(nsmdir + '/' + nsmhost):
58  os.mkdir(nsmdir + '/' + nsmhost)
59  # nsmd = str(os.environ.get('BELLE2_EXTERNALS_BIN')) + '/nsmd2 -f -p ' \
60  nsmd = "nsmd2 -f -p " \
61  + port + ' -s ' + port + ' -h '
62  cmd = 'ssh ' + nsmhost + ' "cd ' + nsmdir + '/' + nsmhost \
63  + '; setenv NSMLOGDIR ' + nsmdir + '/' + nsmhost + ';' + nsmd \
64  + nsmhost + '"'
65  print cmd
66  p = subprocess.Popen(cmd, shell=True)
67  time.sleep(1)
68 
69 
70 # p.wait()
71 
72 # Kill NSMD
73 
74 def kill_nsmd(port, nsmhost):
75  # In the line below, b2code-style-fix changes '{print \$2}'
76  # into '{print \\$2}'. To avoid this, noqa is necessary.
77  cmd = 'ssh ' + nsmhost + ' "ps -fC nsmd2 | grep ' + port \
78  + "| awk '{print \$2}' \" > temp.pid" # noqa
79 # cmd = "ssh -v " + nsmhost + " \"ps -fC nsmd2 | grep " + port + "| awk '{printf(\"klll \%d\", \$2)} | sh' \""
80 # print cmd
81  p = subprocess.Popen(cmd, shell=True)
82  p.wait()
83  for line in open('temp.pid', 'r'):
84  pid = int(line)
85  if pid > 0:
86  cmd = 'ssh ' + nsmhost + ' "kill ' + str(pid) + '"'
87 # print cmd
88  p = subprocess.Popen(cmd, shell=True)
89  p.wait()
90 
91 
92 # Start NSMD on all nodes
93 
94 def start_nsmd(conffile):
95  # Global parameters
96  nsmdir = get_ergetconf(conffile, 'system', 'nsmdir_base')
97  port = get_ergetconf(conffile, 'system', 'nsmport')
98 
99  # Run nsmd on control node
100  ctlhost = get_ergetconf(conffile, 'master', 'ctlhost')
101  run_nsmd(nsmdir, port, ctlhost)
102  print 'nsmd on %s started' % ctlhost
103 
104  # Run nsmd on event server node
105  evshost = get_ergetconf(conffile, 'distributor', 'ctlhost')
106  if ctlhost.find(evshost) == -1:
107  run_nsmd(nsmdir, port, evshost)
108  print 'nsmd on %s started' % evshost
109 
110  # Run nsmd on event processor nodes
111  nnodes = int(get_ergetconf(conffile, 'processor', 'nnodes'))
112  procid = int(get_ergetconf(conffile, 'processor', 'idbase'))
113  badlist = get_ergetconf(conffile, 'processor', 'badlist')
114  evphostbase = get_ergetconf(conffile, 'processor', 'ctlhostbase')
115  for i in range(procid, procid + nnodes):
116  nodeid = '%2.2d' % i
117  if badlist.find(nodeid) == -1:
118  evphost = evphostbase + nodeid
119  run_nsmd(nsmdir, port, evphost)
120  print 'nsmd on %s started' % evphost
121 
122 
123 def stop_nsmd(conffile):
124  port = get_ergetconf(conffile, 'system', 'nsmport')
125 
126  # Kill nsmd on control node
127  ctlhost = get_ergetconf(conffile, 'master', 'ctlhost')
128  kill_nsmd(port, ctlhost)
129  print 'nsmd on %s stopped' % ctlhost
130 
131  # Run nsmd on event server node
132  evshost = get_ergetconf(conffile, 'distributor', 'ctlhost')
133  if ctlhost.find(evshost) == -1:
134  kill_nsmd(port, evshost)
135  print 'nsmd on %s stopped' % evshost
136 
137  # Run nsmd on event processor nodes
138  nnodes = int(get_ergetconf(conffile, 'processor', 'nnodes'))
139  procid = int(get_ergetconf(conffile, 'processor', 'idbase'))
140  badlist = get_ergetconf(conffile, 'processor', 'badlist')
141  evphostbase = get_ergetconf(conffile, 'processor', 'ctlhostbase')
142  for i in range(procid, procid + nnodes):
143  nodeid = '%2.2d' % i
144  if badlist.find(nodeid) == -1:
145  evphost = evphostbase + nodeid
146  kill_nsmd(port, evphost)
147  print 'nsmd on %s stopped' % evphost
148 
149 
150 # RFARM server operations
151 # Run eventserver
152 
153 def run_distributor(conffile):
154  evshost = get_ergetconf(conffile, 'distributor', 'ctlhost')
155  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
156  port = get_ergetconf(conffile, 'system', 'nsmport')
157  if not os.path.exists(basedir + '/distributor'):
158  os.mkdir(basedir + '/distributor')
159  cmd = 'ssh ' + evshost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
160  + '; ereco_distributor ' + get_configpath(conffile) \
161  + ' > & distributor/nsmlog.log" '
162  print cmd
163  p = subprocess.Popen(cmd, shell=True)
164  time.sleep(1)
165 
166 
167 # Stop eventserver
168 
169 def stop_distributor(conffile):
170  evshost = get_ergetconf(conffile, 'distributor', 'ctlhost')
171  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
172  unit = get_ergetconf(conffile, 'system', 'unitname')
173  ringbuf = get_ergetconf(conffile, 'distributor', 'ringbuffer')
174  rbufname = unit + ':' + ringbuf
175  shmname = unit + ':distributor'
176  p = subprocess.Popen('rfcommand ' + conffile +
177  ' distributor RC_ABORT', shell=True)
178  p.wait()
179  pidfile = basedir + '/distributor/pid.data'
180  for pid in open(pidfile, 'r'):
181  cmd = 'ssh ' + evshost + ' "kill ' + pid + '; removerb ' + rbufname \
182  + "; removeshm " + shmname + '"'
183  print cmd
184  p = subprocess.Popen(cmd, shell=True)
185  p.wait()
186 
187 
188 # Start event procesor
189 
190 def run_eventprocessor(conffile):
191  hostbase = get_ergetconf(conffile, 'processor', 'ctlhostbase')
192  nodebase = get_ergetconf(conffile, 'processor', 'nodebase')
193  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
194  port = get_ergetconf(conffile, 'system', 'nsmport')
195  nnodes = int(get_ergetconf(conffile, 'processor', 'nnodes'))
196  procid = int(get_ergetconf(conffile, 'processor', 'idbase'))
197  badlist = get_ergetconf(conffile, 'processor', 'badlist')
198  id = int(get_ergetconf(conffile, 'processor', 'idbase'))
199 
200  for i in range(procid, procid + nnodes):
201  nodeid = '%2.2d' % i
202  if badlist.find(nodeid) == -1:
203  evphost = hostbase + nodeid
204  nodename = nodebase + nodeid
205  if not os.path.exists(basedir + '/evp_' + nodename):
206  os.mkdir(basedir + '/evp_' + nodename)
207  cmd = 'ssh ' + evphost + ' "cd ' + basedir + '; setenv NSM2_PORT ' \
208  + port + '; ereco_eventprocessor ' + get_configpath(conffile) \
209  + ' > & evp_' + nodename + '/nsmlog.log" '
210  print cmd
211  p = subprocess.Popen(cmd, shell=True)
212  time.sleep(1)
213 
214 
215 # Stop event procesor
216 
217 def stop_eventprocessor(conffile):
218  hostbase = get_ergetconf(conffile, 'processor', 'ctlhostbase')
219  nodebase = get_ergetconf(conffile, 'processor', 'nodebase')
220  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
221  port = get_ergetconf(conffile, 'system', 'nsmport')
222  nnodes = int(get_ergetconf(conffile, 'processor', 'nnodes'))
223  procid = int(get_ergetconf(conffile, 'processor', 'idbase'))
224  badlist = get_ergetconf(conffile, 'processor', 'badlist')
225  id = int(get_ergetconf(conffile, 'processor', 'idbase'))
226 
227  unit = get_ergetconf(conffile, 'system', 'unitname')
228  rbufin = get_ergetconf(conffile, 'collector', 'ringbufin')
229  rbufout = get_ergetconf(conffile, 'collector', 'ringbufout')
230  rbufinname = unit + ':' + rbufin
231  rbufoutname = unit + ':' + rbufout
232 
233  for i in range(procid, procid + nnodes):
234  nodeid = '%2.2d' % i
235  if badlist.find(nodeid) == -1:
236  evphost = hostbase + nodeid
237  nodename = 'evp_' + nodebase + nodeid
238  shmname = unit + ':' + nodename
239  print shmname
240  p = subprocess.Popen('rfcommand ' + conffile + ' ' + nodename +
241  ' RC_ABORT', shell=True)
242  p.wait()
243  pidfile = basedir + '/' + nodename + '/pid.data'
244  for pid in open(pidfile, 'r'):
245  cmd = 'ssh ' + evphost + ' "kill ' + pid + '; removerb ' \
246  + rbufinname + '; removerb ' + rbufoutname \
247  + '; removeshm ' + shmname + '; clear_basf2_ipc"'
248 # + '; removeshm ' + '"'
249 # + '; removeshm ' + shmname + '"'
250  print cmd
251  p = subprocess.Popen(cmd, shell=True)
252  p.wait()
253 
254 
255 # Run dqmserver
256 
257 def run_dqmserver(conffile):
258  dqmhost = get_ergetconf(conffile, 'dqmserver', 'ctlhost')
259  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
260  port = get_ergetconf(conffile, 'system', 'nsmport')
261  if not os.path.exists(basedir + '/dqmserver'):
262  os.mkdir(basedir + '/dqmserver')
263  cmd = 'ssh ' + dqmhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
264  + '; rf_dqmserver ' + get_configpath(conffile) \
265  + ' > & dqmserver/nsmlog.log" '
266  print cmd
267  p = subprocess.Popen(cmd, shell=True)
268  time.sleep(1)
269 
270 
271 # Stop dqmserver
272 
273 def stop_dqmserver(conffile):
274  dqmhost = get_ergetconf(conffile, 'dqmserver', 'ctlhost')
275  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
276  p = subprocess.Popen('rfcommand ' + conffile + ' dqmserver RC_ABORT', shell=True)
277  p.wait()
278  pidfile = basedir + '/dqmserver/pid.data'
279  for pid in open(pidfile, 'r'):
280  cmd = 'ssh ' + dqmhost + ' "kill ' + pid + '"'
281  print cmd
282  p = subprocess.Popen(cmd, shell=True)
283  p.wait()
284 
285 # Run eventsampler
286 
287 
288 def run_eventsampler(conffile):
289  samplerhost = get_ergetconf(conffile, 'eventsampler', 'ctlhost')
290  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
291  port = get_ergetconf(conffile, 'system', 'nsmport')
292  if not os.path.exists(basedir + '/sampler'):
293  os.mkdir(basedir + '/sampler')
294  cmd = 'ssh ' + samplerhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
295  + '; ereco_eventsampler ' + get_configpath(conffile) \
296  + ' > & sampler/nsmlog.log" '
297  print cmd
298  p = subprocess.Popen(cmd, shell=True)
299  time.sleep(1)
300 
301 
302 # Stop eventsampler
303 
304 def stop_eventsampler(conffile):
305  samplerhost = get_ergetconf(conffile, 'eventsampler', 'ctlhost')
306  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
307  p = subprocess.Popen('rfcommand ' + conffile + ' sampler RC_ABORT', shell=True)
308  p.wait()
309  pidfile = basedir + '/sampler/pid.data'
310  for pid in open(pidfile, 'r'):
311  cmd = 'ssh ' + samplerhost + ' "kill ' + pid + '"'
312  print cmd
313  p = subprocess.Popen(cmd, shell=True)
314  p.wait()
315 
316 
317 # Run local master
318 
319 def run_master(conffile):
320  masterhost = get_ergetconf(conffile, 'master', 'ctlhost')
321  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
322  port = get_ergetconf(conffile, 'system', 'nsmport')
323  if not os.path.exists(basedir + '/master'):
324  os.mkdir(basedir + '/master')
325  cmd = 'ssh ' + masterhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' \
326  + port + '; ereco_master_local ' + get_configpath(conffile) \
327  + ' > & master/nsmlog.log" '
328  print cmd
329  p = subprocess.Popen(cmd, shell=True)
330  time.sleep(1)
331 
332 
333 # Stop local master
334 
335 def stop_master(conffile):
336  masterhost = get_ergetconf(conffile, 'master', 'ctlhost')
337  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
338 # p = subprocess.Popen ( "rfcommand " + conffile + " master RF_UNCONFIGURE", shell=True );
339 # p.wait();
340  pidfile = basedir + '/master/pid.data'
341  for pid in open(pidfile, 'r'):
342  cmd = 'ssh ' + masterhost + ' "kill ' + pid + '"'
343  print cmd
344  p = subprocess.Popen(cmd, shell=True)
345  p.wait()
346 
347 
348 def start_ereco_components(conffile):
349  run_eventprocessor(conffile)
350  run_distributor(conffile)
351  run_dqmserver(conffile)
352  run_eventsampler(conffile)
353 
354 
355 # Stop ERECO components
356 
357 def stop_ereco_components(conffile):
358  stop_eventsampler(conffile)
359  stop_dqmserver(conffile)
360  stop_distributor(conffile)
361  stop_eventprocessor(conffile)
362 
363 
364 # Start ERECO local operation
365 
366 def start_ereco_local(conffile):
367  start_ereco_components(conffile)
368  run_master(conffile)
369 
370 
371 # Stop ERECO local operation
372 
373 def stop_ereco_local(conffile):
374  # stop_eventprocessor(conffile)
375  stop_ereco_components(conffile)
376  stop_master(conffile)