Belle II Software  release-08-01-10
erecoutil.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 
4 
11 
12 import os
13 import sys
14 import subprocess
15 import time
16 
17 
18 # Basic Utilities
19 # Get full path of the configuration file
20 
21 def get_configpath(conffile):
22  confdir = str(os.environ.get('ERECO_CONFDIR'))
23  if confdir == 'None':
24  print('ERECO_CONFDIR is not defined. Exit.')
25  sys.exit()
26  cmd = confdir + '/' + conffile + '.conf'
27  return cmd
28 
29 
30 # Get Configuration from config file
31 def get_ergetconf(conffile, item1, item2='NULL', item3='NULL'):
32 
33  confdir = str(os.environ.get('ERECO_CONFDIR'))
34  if confdir == 'None':
35  print('ERECO_CONFDIR is not defined. Exit.')
36  sys.exit()
37  cmd = 'rfgetconf ' + get_configpath(conffile) + ' ' + item1 + ' ' + item2 \
38  + ' ' + item3
39  p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
40  stderr=subprocess.PIPE)
41  p.wait()
42  output = p.stdout.read()
43 # print(output)
44  return output
45 
46 
47 # print("waiting")
48 # confout = p.stdout.read()
49 
50 # NSMD related utilities
51 # Run NSMD
52 
53 def run_nsmd(nsmdir, port, nsmhost):
54  # Check directory for loggin
55  if not os.path.exists(nsmdir + '/' + nsmhost):
56  os.mkdir(nsmdir + '/' + nsmhost)
57  # nsmd = str(os.environ.get('BELLE2_EXTERNALS_BIN')) + '/nsmd2 -f -p ' \
58  nsmd = "nsmd2 -f -p " \
59  + port + ' -s ' + port + ' -h '
60  cmd = 'ssh ' + nsmhost + ' "cd ' + nsmdir + '/' + nsmhost \
61  + '; setenv NSMLOGDIR ' + nsmdir + '/' + nsmhost + ';' + nsmd \
62  + nsmhost + '"'
63  print(cmd)
64  subprocess.Popen(cmd, shell=True)
65  time.sleep(1)
66 
67 
68 # p.wait()
69 
70 # Kill NSMD
71 
72 def kill_nsmd(port, nsmhost):
73  # In the line below, b2code-style-fix changes '{print \$2}'
74  # into '{print \\$2}'. To avoid this, noqa is necessary.
75  cmd = 'ssh ' + nsmhost + ' "ps -fC nsmd2 | grep ' + port \
76  + "| awk '{print \$2}' \" > temp.pid" # noqa
77 # cmd = "ssh -v " + nsmhost + " \"ps -fC nsmd2 | grep " + port + "| awk '{printf(\"klll \%d\", \$2)} | sh' \""
78 # print(cmd)
79  p = subprocess.Popen(cmd, shell=True)
80  p.wait()
81  for line in open('temp.pid', 'r'):
82  pid = int(line)
83  if pid > 0:
84  cmd = 'ssh ' + nsmhost + ' "kill ' + str(pid) + '"'
85 # print(cmd)
86  p = subprocess.Popen(cmd, shell=True)
87  p.wait()
88 
89 
90 # Start NSMD on all nodes
91 
92 def start_nsmd(conffile):
93  # Global parameters
94  nsmdir = get_ergetconf(conffile, 'system', 'nsmdir_base')
95  port = get_ergetconf(conffile, 'system', 'nsmport')
96 
97  # Run nsmd on control node
98  ctlhost = get_ergetconf(conffile, 'master', 'ctlhost')
99  run_nsmd(nsmdir, port, ctlhost)
100  print('nsmd on %s started' % ctlhost)
101 
102  # Run nsmd on event server node
103  evshost = get_ergetconf(conffile, 'distributor', 'ctlhost')
104  if ctlhost.find(evshost) == -1:
105  run_nsmd(nsmdir, port, evshost)
106  print('nsmd on %s started' % evshost)
107 
108  # Run nsmd on event processor nodes
109  nnodes = int(get_ergetconf(conffile, 'processor', 'nnodes'))
110  procid = int(get_ergetconf(conffile, 'processor', 'idbase'))
111  badlist = get_ergetconf(conffile, 'processor', 'badlist')
112  evphostbase = get_ergetconf(conffile, 'processor', 'ctlhostbase')
113  for i in range(procid, procid + nnodes):
114  nodeid = '%2.2d' % i
115  if badlist.find(nodeid) == -1:
116  evphost = evphostbase + nodeid
117  run_nsmd(nsmdir, port, evphost)
118  print('nsmd on %s started' % evphost)
119 
120 
121 def stop_nsmd(conffile):
122  port = get_ergetconf(conffile, 'system', 'nsmport')
123 
124  # Kill nsmd on control node
125  ctlhost = get_ergetconf(conffile, 'master', 'ctlhost')
126  kill_nsmd(port, ctlhost)
127  print('nsmd on %s stopped' % ctlhost)
128 
129  # Run nsmd on event server node
130  evshost = get_ergetconf(conffile, 'distributor', 'ctlhost')
131  if ctlhost.find(evshost) == -1:
132  kill_nsmd(port, evshost)
133  print('nsmd on %s stopped' % evshost)
134 
135  # Run nsmd on event processor nodes
136  nnodes = int(get_ergetconf(conffile, 'processor', 'nnodes'))
137  procid = int(get_ergetconf(conffile, 'processor', 'idbase'))
138  badlist = get_ergetconf(conffile, 'processor', 'badlist')
139  evphostbase = get_ergetconf(conffile, 'processor', 'ctlhostbase')
140  for i in range(procid, procid + nnodes):
141  nodeid = '%2.2d' % i
142  if badlist.find(nodeid) == -1:
143  evphost = evphostbase + nodeid
144  kill_nsmd(port, evphost)
145  print('nsmd on %s stopped' % evphost)
146 
147 
148 # RFARM server operations
149 # Run eventserver
150 
151 def run_distributor(conffile):
152  evshost = get_ergetconf(conffile, 'distributor', 'ctlhost')
153  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
154  port = get_ergetconf(conffile, 'system', 'nsmport')
155  if not os.path.exists(basedir + '/distributor'):
156  os.mkdir(basedir + '/distributor')
157  cmd = 'ssh ' + evshost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
158  + '; ereco_distributor ' + get_configpath(conffile) \
159  + ' > & distributor/nsmlog.log" '
160  print(cmd)
161  subprocess.Popen(cmd, shell=True)
162  time.sleep(1)
163 
164 
165 # Stop eventserver
166 
167 def stop_distributor(conffile):
168  evshost = get_ergetconf(conffile, 'distributor', 'ctlhost')
169  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
170  unit = get_ergetconf(conffile, 'system', 'unitname')
171  ringbuf = get_ergetconf(conffile, 'distributor', 'ringbuffer')
172  rbufname = unit + ':' + ringbuf
173  shmname = unit + ':distributor'
174  p = subprocess.Popen('rfcommand ' + conffile +
175  ' distributor RC_ABORT', shell=True)
176  p.wait()
177  pidfile = basedir + '/distributor/pid.data'
178  for pid in open(pidfile, 'r'):
179  cmd = 'ssh ' + evshost + ' "kill ' + pid + '; removerb ' + rbufname \
180  + "; removeshm " + shmname + '"'
181  print(cmd)
182  p = subprocess.Popen(cmd, shell=True)
183  p.wait()
184 
185 
186 # Start event procesor
187 
188 def run_eventprocessor(conffile):
189  hostbase = get_ergetconf(conffile, 'processor', 'ctlhostbase')
190  nodebase = get_ergetconf(conffile, 'processor', 'nodebase')
191  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
192  port = get_ergetconf(conffile, 'system', 'nsmport')
193  nnodes = int(get_ergetconf(conffile, 'processor', 'nnodes'))
194  procid = int(get_ergetconf(conffile, 'processor', 'idbase'))
195  badlist = get_ergetconf(conffile, 'processor', 'badlist')
196  id = int(get_ergetconf(conffile, 'processor', 'idbase')) # noqa
197 
198  for i in range(procid, procid + nnodes):
199  nodeid = '%2.2d' % i
200  if badlist.find(nodeid) == -1:
201  evphost = hostbase + nodeid
202  nodename = nodebase + nodeid
203  if not os.path.exists(basedir + '/evp_' + nodename):
204  os.mkdir(basedir + '/evp_' + nodename)
205  cmd = 'ssh ' + evphost + ' "cd ' + basedir + '; setenv NSM2_PORT ' \
206  + port + '; ereco_eventprocessor ' + get_configpath(conffile) \
207  + ' > & evp_' + nodename + '/nsmlog.log" '
208  print(cmd)
209  subprocess.Popen(cmd, shell=True)
210  time.sleep(1)
211 
212 
213 # Stop event procesor
214 
215 def stop_eventprocessor(conffile):
216  hostbase = get_ergetconf(conffile, 'processor', 'ctlhostbase')
217  nodebase = get_ergetconf(conffile, 'processor', 'nodebase')
218  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
219  port = get_ergetconf(conffile, 'system', 'nsmport') # noqa
220  nnodes = int(get_ergetconf(conffile, 'processor', 'nnodes'))
221  procid = int(get_ergetconf(conffile, 'processor', 'idbase'))
222  badlist = get_ergetconf(conffile, 'processor', 'badlist')
223  id = int(get_ergetconf(conffile, 'processor', 'idbase')) # noqa
224 
225  unit = get_ergetconf(conffile, 'system', 'unitname')
226  rbufin = get_ergetconf(conffile, 'collector', 'ringbufin')
227  rbufout = get_ergetconf(conffile, 'collector', 'ringbufout')
228  rbufinname = unit + ':' + rbufin
229  rbufoutname = unit + ':' + rbufout
230 
231  for i in range(procid, procid + nnodes):
232  nodeid = '%2.2d' % i
233  if badlist.find(nodeid) == -1:
234  evphost = hostbase + nodeid
235  nodename = 'evp_' + nodebase + nodeid
236  shmname = unit + ':' + nodename
237  print(shmname)
238  p = subprocess.Popen('rfcommand ' + conffile + ' ' + nodename +
239  ' RC_ABORT', shell=True)
240  p.wait()
241  pidfile = basedir + '/' + nodename + '/pid.data'
242  for pid in open(pidfile, 'r'):
243  cmd = 'ssh ' + evphost + ' "kill ' + pid + '; removerb ' \
244  + rbufinname + '; removerb ' + rbufoutname \
245  + '; removeshm ' + shmname + '; clear_basf2_ipc"'
246 # + '; removeshm ' + '"'
247 # + '; removeshm ' + shmname + '"'
248  print(cmd)
249  p = subprocess.Popen(cmd, shell=True)
250  p.wait()
251 
252 
253 # Run dqmserver
254 
255 def run_dqmserver(conffile):
256  dqmhost = get_ergetconf(conffile, 'dqmserver', 'ctlhost')
257  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
258  port = get_ergetconf(conffile, 'system', 'nsmport')
259  if not os.path.exists(basedir + '/dqmserver'):
260  os.mkdir(basedir + '/dqmserver')
261  cmd = 'ssh ' + dqmhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
262  + '; rf_dqmserver ' + get_configpath(conffile) \
263  + ' > & dqmserver/nsmlog.log" '
264  print(cmd)
265  subprocess.Popen(cmd, shell=True)
266  time.sleep(1)
267 
268 
269 # Stop dqmserver
270 
271 def stop_dqmserver(conffile):
272  dqmhost = get_ergetconf(conffile, 'dqmserver', 'ctlhost')
273  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
274  p = subprocess.Popen('rfcommand ' + conffile + ' dqmserver RC_ABORT', shell=True)
275  p.wait()
276  pidfile = basedir + '/dqmserver/pid.data'
277  for pid in open(pidfile, 'r'):
278  cmd = 'ssh ' + dqmhost + ' "kill ' + pid + '"'
279  print(cmd)
280  p = subprocess.Popen(cmd, shell=True)
281  p.wait()
282 
283 # Run eventsampler
284 
285 
286 def run_eventsampler(conffile):
287  samplerhost = get_ergetconf(conffile, 'eventsampler', 'ctlhost')
288  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
289  port = get_ergetconf(conffile, 'system', 'nsmport')
290  if not os.path.exists(basedir + '/sampler'):
291  os.mkdir(basedir + '/sampler')
292  cmd = 'ssh ' + samplerhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
293  + '; ereco_eventsampler ' + get_configpath(conffile) \
294  + ' > & sampler/nsmlog.log" '
295  print(cmd)
296  subprocess.Popen(cmd, shell=True)
297  time.sleep(1)
298 
299 
300 # Stop eventsampler
301 
302 def stop_eventsampler(conffile):
303  samplerhost = get_ergetconf(conffile, 'eventsampler', 'ctlhost')
304  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
305  p = subprocess.Popen('rfcommand ' + conffile + ' sampler RC_ABORT', shell=True)
306  p.wait()
307  pidfile = basedir + '/sampler/pid.data'
308  for pid in open(pidfile, 'r'):
309  cmd = 'ssh ' + samplerhost + ' "kill ' + pid + '"'
310  print(cmd)
311  p = subprocess.Popen(cmd, shell=True)
312  p.wait()
313 
314 
315 # Run local master
316 
317 def run_master(conffile):
318  masterhost = get_ergetconf(conffile, 'master', 'ctlhost')
319  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
320  port = get_ergetconf(conffile, 'system', 'nsmport')
321  if not os.path.exists(basedir + '/master'):
322  os.mkdir(basedir + '/master')
323  cmd = 'ssh ' + masterhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' \
324  + port + '; ereco_master_local ' + get_configpath(conffile) \
325  + ' > & master/nsmlog.log" '
326  print(cmd)
327  subprocess.Popen(cmd, shell=True)
328  time.sleep(1)
329 
330 
331 # Stop local master
332 
333 def stop_master(conffile):
334  masterhost = get_ergetconf(conffile, 'master', 'ctlhost')
335  basedir = get_ergetconf(conffile, 'system', 'execdir_base')
336 # p = subprocess.Popen ( "rfcommand " + conffile + " master RF_UNCONFIGURE", shell=True );
337 # p.wait();
338  pidfile = basedir + '/master/pid.data'
339  for pid in open(pidfile, 'r'):
340  cmd = 'ssh ' + masterhost + ' "kill ' + pid + '"'
341  print(cmd)
342  p = subprocess.Popen(cmd, shell=True)
343  p.wait()
344 
345 
346 def start_ereco_components(conffile):
347  run_eventprocessor(conffile)
348  run_distributor(conffile)
349  run_dqmserver(conffile)
350  run_eventsampler(conffile)
351 
352 
353 # Stop ERECO components
354 
355 def stop_ereco_components(conffile):
356  stop_eventsampler(conffile)
357  stop_dqmserver(conffile)
358  stop_distributor(conffile)
359  stop_eventprocessor(conffile)
360 
361 
362 # Start ERECO local operation
363 
364 def start_ereco_local(conffile):
365  start_ereco_components(conffile)
366  run_master(conffile)
367 
368 
369 # Stop ERECO local operation
370 
371 def stop_ereco_local(conffile):
372  # stop_eventprocessor(conffile)
373  stop_ereco_components(conffile)
374  stop_master(conffile)