Belle II Software  release-05-01-25
erecoutil.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 
4 import os
5 import sys
6 import subprocess
7 import signal
8 import socket
9 import time
10 
11 
12 # Basic Utilities
13 # Get full path of the configuration file
14 
15 def get_configpath(conffile):
16  confdir = str(os.environ.get('ERECO_CONFDIR'))
17  if confdir == 'None':
18  print 'ERECO_CONFDIR is not defined. Exit.'
19  sys.exit()
20  cmd = confdir + '/' + conffile + '.conf'
21  return cmd
22 
23 
24 # Get Configuration from config file
25 def get_ergetconf(conffile, item1, item2='NULL', item3='NULL'):
26 
27  confdir = str(os.environ.get('ERECO_CONFDIR'))
28  if confdir == 'None':
29  print 'ERECO_CONFDIR is not defined. Exit.'
30  sys.exit()
31  cmd = 'rfgetconf ' + get_configpath(conffile) + ' ' + item1 + ' ' + item2 \
32  + ' ' + item3
33  p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
34  stderr=subprocess.PIPE)
35  p.wait()
36  output = p.stdout.read()
37 # print output
38  return output
39 
40 
41 # print "waiting"
42 # confout = p.stdout.read()
43 
44 # NSMD related utilities
45 # Run NSMD
46 
47 def run_nsmd(nsmdir, port, nsmhost):
48  # Check directory for loggin
49  if not os.path.exists(nsmdir + '/' + nsmhost):
50  os.mkdir(nsmdir + '/' + nsmhost)
51  # nsmd = str(os.environ.get('BELLE2_EXTERNALS_BIN')) + '/nsmd2 -f -p ' \
52  nsmd = "nsmd2 -f -p " \
53  + port + ' -s ' + port + ' -h '
54  cmd = 'ssh ' + nsmhost + ' "cd ' + nsmdir + '/' + nsmhost \
55  + '; setenv NSMLOGDIR ' + nsmdir + '/' + nsmhost + ';' + nsmd \
56  + nsmhost + '"'
57  print cmd
58  p = subprocess.Popen(cmd, shell=True)
59  time.sleep(1)
60 
61 
62 # p.wait()
63 
64 # Kill NSMD
65 
66 def kill_nsmd(port, nsmhost):
67  cmd = 'ssh ' + nsmhost + ' "ps -fC nsmd2 | grep ' + port \
68  + "| awk '{print \$2}' \" > temp.pid"
69 # cmd = "ssh -v " + nsmhost + " \"ps -fC nsmd2 | grep " + port + "| awk '{printf(\"klll \%d\", \$2)} | sh' \""
70 # print cmd
71  p = subprocess.Popen(cmd, shell=True)
72  p.wait()
73  for line in open('temp.pid', 'r'):
74  pid = int(line)
75  if pid > 0:
76  cmd = 'ssh ' + nsmhost + ' "kill ' + str(pid) + '"'
77 # print cmd
78  p = subprocess.Popen(cmd, shell=True)
79  p.wait()
80 
81 
82 # Start NSMD on all nodes
83 
84 def start_nsmd(conffile):
85  # Global parameters
86  nsmdir = get_ergetconf(conffile, 'system', 'nsmdir_base')
87  port = get_ergetconf(conffile, 'system', 'nsmport')
88 
89  # Run nsmd on control node
90  ctlhost = get_ergetconf(conffile, 'master', 'ctlhost')
91  run_nsmd(nsmdir, port, ctlhost)
92  print 'nsmd on %s started' % ctlhost
93 
94  # Run nsmd on event server node
95  evshost = get_ergetconf(conffile, 'distributor', 'ctlhost')
96  if ctlhost.find(evshost) == -1:
97  run_nsmd(nsmdir, port, evshost)
98  print 'nsmd on %s started' % evshost
99 
100  # Run nsmd on event processor nodes
101  nnodes = int(get_ergetconf(conffile, 'processor', 'nnodes'))
102  procid = int(get_ergetconf(conffile, 'processor', 'idbase'))
103  badlist = get_ergetconf(conffile, 'processor', 'badlist')
104  evphostbase = get_ergetconf(conffile, 'processor', 'ctlhostbase')
105  for i in range(procid, procid + nnodes):
106  nodeid = '%2.2d' % i
107  if badlist.find(nodeid) == -1:
108  evphost = evphostbase + nodeid
109  run_nsmd(nsmdir, port, evphost)
110  print 'nsmd on %s started' % evphost
111 
112 
113 def stop_nsmd(conffile):
114  port = get_ergetconf(conffile, 'system', 'nsmport')
115 
116  # Kill nsmd on control node
117  ctlhost = get_ergetconf(conffile, 'master', 'ctlhost')
118  kill_nsmd(port, ctlhost)
119  print 'nsmd on %s stopped' % ctlhost
120 
121  # Run nsmd on event server node
122  evshost = get_ergetconf(conffile, 'distributor', 'ctlhost')
123  if ctlhost.find(evshost) == -1:
124  kill_nsmd(port, evshost)
125  print 'nsmd on %s stopped' % evshost
126 
127  # Run nsmd on event processor nodes
128  nnodes = int(get_ergetconf(conffile, 'processor', 'nnodes'))
129  procid = int(get_ergetconf(conffile, 'processor', 'idbase'))
130  badlist = get_ergetconf(conffile, 'processor', 'badlist')
131  evphostbase = get_ergetconf(conffile, 'processor', 'ctlhostbase')
132  for i in range(procid, procid + nnodes):
133  nodeid = '%2.2d' % i
134  if badlist.find(nodeid) == -1:
135  evphost = evphostbase + nodeid
136  kill_nsmd(port, evphost)
137  print 'nsmd on %s stopped' % evphost
138 
139 
140 # RFARM server operations
141 # Run eventserver
142 
143 def run_distributor(conffile):
144  evshost = get_ergetconf(conffile, 'distributor', 'ctlhost')
145  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
146  port = get_ergetconf(conffile, 'system', 'nsmport')
147  if not os.path.exists(basedir + '/distributor'):
148  os.mkdir(basedir + '/distributor')
149  cmd = 'ssh ' + evshost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
150  + '; ereco_distributor ' + get_configpath(conffile) \
151  + ' > & distributor/nsmlog.log" '
152  print cmd
153  p = subprocess.Popen(cmd, shell=True)
154  time.sleep(1)
155 
156 
157 # Stop eventserver
158 
159 def stop_distributor(conffile):
160  evshost = get_ergetconf(conffile, 'distributor', 'ctlhost')
161  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
162  unit = get_ergetconf(conffile, 'system', 'unitname')
163  ringbuf = get_ergetconf(conffile, 'distributor', 'ringbuffer')
164  rbufname = unit + ':' + ringbuf
165  shmname = unit + ':distributor'
166  p = subprocess.Popen('rfcommand ' + conffile +
167  ' distributor RC_ABORT', shell=True)
168  p.wait()
169  pidfile = basedir + '/distributor/pid.data'
170  for pid in open(pidfile, 'r'):
171  cmd = 'ssh ' + evshost + ' "kill ' + pid + '; removerb ' + rbufname \
172  + "; removeshm " + shmname + '"'
173  print cmd
174  p = subprocess.Popen(cmd, shell=True)
175  p.wait()
176 
177 
178 # Start event procesor
179 
180 def run_eventprocessor(conffile):
181  hostbase = get_ergetconf(conffile, 'processor', 'ctlhostbase')
182  nodebase = get_ergetconf(conffile, 'processor', 'nodebase')
183  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
184  port = get_ergetconf(conffile, 'system', 'nsmport')
185  nnodes = int(get_ergetconf(conffile, 'processor', 'nnodes'))
186  procid = int(get_ergetconf(conffile, 'processor', 'idbase'))
187  badlist = get_ergetconf(conffile, 'processor', 'badlist')
188  id = int(get_ergetconf(conffile, 'processor', 'idbase'))
189 
190  for i in range(procid, procid + nnodes):
191  nodeid = '%2.2d' % i
192  if badlist.find(nodeid) == -1:
193  evphost = hostbase + nodeid
194  nodename = nodebase + nodeid
195  if not os.path.exists(basedir + '/evp_' + nodename):
196  os.mkdir(basedir + '/evp_' + nodename)
197  cmd = 'ssh ' + evphost + ' "cd ' + basedir + '; setenv NSM2_PORT ' \
198  + port + '; ereco_eventprocessor ' + get_configpath(conffile) \
199  + ' > & evp_' + nodename + '/nsmlog.log" '
200  print cmd
201  p = subprocess.Popen(cmd, shell=True)
202  time.sleep(1)
203 
204 
205 # Stop event procesor
206 
207 def stop_eventprocessor(conffile):
208  hostbase = get_ergetconf(conffile, 'processor', 'ctlhostbase')
209  nodebase = get_ergetconf(conffile, 'processor', 'nodebase')
210  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
211  port = get_ergetconf(conffile, 'system', 'nsmport')
212  nnodes = int(get_ergetconf(conffile, 'processor', 'nnodes'))
213  procid = int(get_ergetconf(conffile, 'processor', 'idbase'))
214  badlist = get_ergetconf(conffile, 'processor', 'badlist')
215  id = int(get_ergetconf(conffile, 'processor', 'idbase'))
216 
217  unit = get_ergetconf(conffile, 'system', 'unitname')
218  rbufin = get_ergetconf(conffile, 'collector', 'ringbufin')
219  rbufout = get_ergetconf(conffile, 'collector', 'ringbufout')
220  rbufinname = unit + ':' + rbufin
221  rbufoutname = unit + ':' + rbufout
222 
223  for i in range(procid, procid + nnodes):
224  nodeid = '%2.2d' % i
225  if badlist.find(nodeid) == -1:
226  evphost = hostbase + nodeid
227  nodename = 'evp_' + nodebase + nodeid
228  shmname = unit + ':' + nodename
229  print shmname
230  p = subprocess.Popen('rfcommand ' + conffile + ' ' + nodename +
231  ' RC_ABORT', shell=True)
232  p.wait()
233  pidfile = basedir + '/' + nodename + '/pid.data'
234  for pid in open(pidfile, 'r'):
235  cmd = 'ssh ' + evphost + ' "kill ' + pid + '; removerb ' \
236  + rbufinname + '; removerb ' + rbufoutname \
237  + '; removeshm ' + shmname + '; clear_basf2_ipc"'
238 # + '; removeshm ' + '"'
239 # + '; removeshm ' + shmname + '"'
240  print cmd
241  p = subprocess.Popen(cmd, shell=True)
242  p.wait()
243 
244 
245 # Run dqmserver
246 
247 def run_dqmserver(conffile):
248  dqmhost = get_ergetconf(conffile, 'dqmserver', 'ctlhost')
249  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
250  port = get_ergetconf(conffile, 'system', 'nsmport')
251  if not os.path.exists(basedir + '/dqmserver'):
252  os.mkdir(basedir + '/dqmserver')
253  cmd = 'ssh ' + dqmhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
254  + '; rf_dqmserver ' + get_configpath(conffile) \
255  + ' > & dqmserver/nsmlog.log" '
256  print cmd
257  p = subprocess.Popen(cmd, shell=True)
258  time.sleep(1)
259 
260 
261 # Stop dqmserver
262 
263 def stop_dqmserver(conffile):
264  dqmhost = get_ergetconf(conffile, 'dqmserver', 'ctlhost')
265  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
266  p = subprocess.Popen('rfcommand ' + conffile + ' dqmserver RC_ABORT', shell=True)
267  p.wait()
268  pidfile = basedir + '/dqmserver/pid.data'
269  for pid in open(pidfile, 'r'):
270  cmd = 'ssh ' + dqmhost + ' "kill ' + pid + '"'
271  print cmd
272  p = subprocess.Popen(cmd, shell=True)
273  p.wait()
274 
275 # Run eventsampler
276 
277 
278 def run_eventsampler(conffile):
279  samplerhost = get_ergetconf(conffile, 'eventsampler', 'ctlhost')
280  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
281  port = get_ergetconf(conffile, 'system', 'nsmport')
282  if not os.path.exists(basedir + '/sampler'):
283  os.mkdir(basedir + '/sampler')
284  cmd = 'ssh ' + samplerhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
285  + '; ereco_eventsampler ' + get_configpath(conffile) \
286  + ' > & sampler/nsmlog.log" '
287  print cmd
288  p = subprocess.Popen(cmd, shell=True)
289  time.sleep(1)
290 
291 
292 # Stop eventsampler
293 
294 def stop_eventsampler(conffile):
295  samplerhost = get_ergetconf(conffile, 'eventsampler', 'ctlhost')
296  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
297  p = subprocess.Popen('rfcommand ' + conffile + ' sampler RC_ABORT', shell=True)
298  p.wait()
299  pidfile = basedir + '/sampler/pid.data'
300  for pid in open(pidfile, 'r'):
301  cmd = 'ssh ' + samplerhost + ' "kill ' + pid + '"'
302  print cmd
303  p = subprocess.Popen(cmd, shell=True)
304  p.wait()
305 
306 
307 # Run local master
308 
309 def run_master(conffile):
310  masterhost = get_ergetconf(conffile, 'master', 'ctlhost')
311  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
312  port = get_ergetconf(conffile, 'system', 'nsmport')
313  if not os.path.exists(basedir + '/master'):
314  os.mkdir(basedir + '/master')
315  cmd = 'ssh ' + masterhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' \
316  + port + '; ereco_master_local ' + get_configpath(conffile) \
317  + ' > & master/nsmlog.log" '
318  print cmd
319  p = subprocess.Popen(cmd, shell=True)
320  time.sleep(1)
321 
322 
323 # Stop local master
324 
325 def stop_master(conffile):
326  masterhost = get_ergetconf(conffile, 'master', 'ctlhost')
327  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
328 # p = subprocess.Popen ( "rfcommand " + conffile + " master RF_UNCONFIGURE", shell=True );
329 # p.wait();
330  pidfile = basedir + '/master/pid.data'
331  for pid in open(pidfile, 'r'):
332  cmd = 'ssh ' + masterhost + ' "kill ' + pid + '"'
333  print cmd
334  p = subprocess.Popen(cmd, shell=True)
335  p.wait()
336 
337 
338 def start_ereco_components(conffile):
339  run_eventprocessor(conffile)
340  run_distributor(conffile)
341  run_dqmserver(conffile)
342  run_eventsampler(conffile)
343 
344 
345 # Stop ERECO components
346 
347 def stop_ereco_components(conffile):
348  stop_eventsampler(conffile)
349  stop_dqmserver(conffile)
350  stop_distributor(conffile)
351  stop_eventprocessor(conffile)
352 
353 
354 # Start ERECO local operation
355 
356 def start_ereco_local(conffile):
357  start_ereco_components(conffile)
358  run_master(conffile)
359 
360 
361 # Stop ERECO local operation
362 
363 def stop_ereco_local(conffile):
364  # stop_eventprocessor(conffile)
365  stop_ereco_components(conffile)
366  stop_master(conffile)