Belle II Software development
rfarmutil_bash.py
1#!/usr/bin/env python
2# DESY-TB special Nov.8, 2016, R.Itoh
3# -*- coding: utf-8 -*-
4
5
12
13import os
14import sys
15import subprocess
16import time
17
18
19# Basic Utilities
20# Get full path of the configuration file
21
22def get_configpath(conffile):
23 confdir = str(os.environ.get('RFARM_CONFDIR'))
24 if confdir == 'None':
25 print('RFARM_CONFDIR is not defined. Exit.')
26 sys.exit()
27 cmd = confdir + '/' + conffile + '.conf'
28 return cmd
29
30
31# Get Configuration from config file
32def get_rfgetconf(conffile, item1, item2='NULL', item3='NULL'):
33
34 confdir = str(os.environ.get('RFARM_CONFDIR'))
35 if confdir == 'None':
36 print('RFARM_CONFDIR is not defined. Exit.')
37 sys.exit()
38 cmd = 'rfgetconf ' + get_configpath(conffile) + ' ' + item1 + ' ' + item2 \
39 + ' ' + item3
40 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
41 stderr=subprocess.PIPE)
42 p.wait()
43 output = p.stdout.read()
44# print(output)
45 return output
46
47
48# print("waiting")
49# confout = p.stdout.read()
50
51# NSMD related utilities
52# Run NSMD
53
54def run_nsmd(nsmdir, port, nsmhost):
55 # Check directory for loggin
56 if not os.path.exists(nsmdir + '/' + nsmhost):
57 os.mkdir(nsmdir + '/' + nsmhost)
58 nsmd = str(os.environ.get('BELLE2_EXTERNALS_BIN')) + '/nsmd2 -f -p ' \
59 + port + ' -s ' + port + ' -h '
60 cmd = 'ssh ' + nsmhost + ' "cd ' + nsmdir + '/' + nsmhost \
61 + '; export NSMLOGDIR=' + nsmdir + '/' + nsmhost + ';' + nsmd \
62 + nsmhost + '"'
63 print(cmd)
64 subprocess.Popen(cmd, shell=True)
65 time.sleep(1)
66
67
68# p.wait()
69
70# Kill NSMD
71
72def kill_nsmd(port, nsmhost):
73 cmd = 'ssh ' + nsmhost + ' "ps -fC nsmd2 | grep ' + port \
74 + "| awk '{print \\$2}' \" > temp.pid"
75# cmd = "ssh -v " + nsmhost + " \"ps -fC nsmd2 | grep " + port + "| awk '{printf(\"klll \%d\", \$2)} | sh' \""
76# print(cmd)
77 p = subprocess.Popen(cmd, shell=True)
78 p.wait()
79 for line in open('temp.pid', 'r'):
80 pid = int(line)
81 if pid > 0:
82 cmd = 'ssh ' + nsmhost + ' "kill ' + str(pid) + '"'
83# print(cmd)
84 p = subprocess.Popen(cmd, shell=True)
85 p.wait()
86
87
88# Start NSMD on all nodes
89
90def start_nsmd(conffile):
91 # Global parameters
92 nsmdir = get_rfgetconf(conffile, 'system', 'nsmdir_base')
93 port = get_rfgetconf(conffile, 'system', 'nsmport')
94
95 # Run nsmd on control node
96 ctlhost = get_rfgetconf(conffile, 'master', 'ctlhost')
97 run_nsmd(nsmdir, port, ctlhost)
98 print('nsmd on %s started' % ctlhost)
99
100 # Run nsmd on event server node
101 evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
102 if ctlhost.find(evshost) == -1:
103 run_nsmd(nsmdir, port, evshost)
104 print('nsmd on %s started' % evshost)
105
106 # Run nsmd on output server node
107 opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
108 run_nsmd(nsmdir, port, opshost)
109 print('nsmd on %s started' % opshost)
110
111 # Run nsmd on event processor nodes
112 nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
113 procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
114 badlist = get_rfgetconf(conffile, 'processor', 'badlist')
115 evphostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
116 for i in range(procid, procid + nnodes):
117 # nodeid = '%2.2d' % i
118 nodeid = '%1.1d' % i # DESY only
119 if badlist.find(nodeid) == -1:
120 evphost = evphostbase + nodeid
121 run_nsmd(nsmdir, port, evphost)
122 print('nsmd on %s started' % evphost)
123
124
125def stop_nsmd(conffile):
126 port = get_rfgetconf(conffile, 'system', 'nsmport')
127
128 # Kill nsmd on control node
129 ctlhost = get_rfgetconf(conffile, 'master', 'ctlhost')
130 kill_nsmd(port, ctlhost)
131 print('nsmd on %s stopped' % ctlhost)
132
133 # Run nsmd on event server node
134 evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
135 if ctlhost.find(evshost) == -1:
136 kill_nsmd(port, evshost)
137 print('nsmd on %s stopped' % evshost)
138
139 # Run nsmd on output server node
140 opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
141 kill_nsmd(port, opshost)
142 print('nsmd on %s stopped' % opshost)
143
144 # Run nsmd on event processor nodes
145 nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
146 procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
147 badlist = get_rfgetconf(conffile, 'processor', 'badlist')
148 evphostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
149 for i in range(procid, procid + nnodes):
150 # nodeid = '%2.2d' % i
151 nodeid = '%1.1d' % i # DESY only
152 if badlist.find(nodeid) == -1:
153 evphost = evphostbase + nodeid
154 kill_nsmd(port, evphost)
155 print('nsmd on %s stopped' % evphost)
156
157
158# RFARM server operations
159# Run eventserver
160
161def run_eventserver(conffile):
162 evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
163 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
164 port = get_rfgetconf(conffile, 'system', 'nsmport')
165 if not os.path.exists(basedir + '/distributor'):
166 os.mkdir(basedir + '/distributor')
167 cmd = 'ssh ' + evshost + ' "cd ' + basedir + '; export NSM2_PORT=' + port \
168 + '; rf_eventserver ' + get_configpath(conffile) \
169 + ' &> distributor/nsmlog.log" '
170 print(cmd)
171 subprocess.Popen(cmd, shell=True)
172 time.sleep(1)
173
174
175# Stop eventserver
176
177def stop_eventserver(conffile):
178 evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
179 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
180 unit = get_rfgetconf(conffile, 'system', 'unitname')
181 ringbuf = get_rfgetconf(conffile, 'distributor', 'ringbuffer')
182 rbufname = unit + ':' + ringbuf
183 shmname = unit + ':distributor'
184 p = subprocess.Popen('rfcommand ' + conffile +
185 ' distributor RF_UNCONFIGURE', shell=True)
186 p.wait()
187 pidfile = basedir + '/distributor/pid.data'
188 for pid in open(pidfile, 'r'):
189 cmd = 'ssh ' + evshost + ' "kill ' + pid + '; removerb ' + rbufname \
190 + "; removeshm " + shmname + '"'
191 print(cmd)
192 p = subprocess.Popen(cmd, shell=True)
193 p.wait()
194
195
196# Run outputserver
197
198def run_outputserver(conffile):
199 opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
200 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
201 port = get_rfgetconf(conffile, 'system', 'nsmport')
202 if not os.path.exists(basedir + '/collector'):
203 os.mkdir(basedir + '/collector')
204 cmd = 'ssh ' + opshost + ' "cd ' + basedir + '; export NSM2_PORT=' + port \
205 + '; rf_outputserver ' + get_configpath(conffile) \
206 + ' &> collector/nsmlog.log" '
207 print(cmd)
208 subprocess.Popen(cmd, shell=True)
209 time.sleep(1)
210
211
212# Stop outputserver
213
214def stop_outputserver(conffile):
215 opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
216 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
217 unit = get_rfgetconf(conffile, 'system', 'unitname')
218 rbufin = get_rfgetconf(conffile, 'collector', 'ringbufin')
219 rbufout = get_rfgetconf(conffile, 'collector', 'ringbufout')
220 rbufinname = unit + ':' + rbufin
221 rbufoutname = unit + ':' + rbufout
222 shmname = unit + ':collector'
223 p = subprocess.Popen('rfcommand ' + conffile + ' collector RF_UNCONFIGURE', shell=True)
224 p.wait()
225 pidfile = basedir + '/collector/pid.data'
226 for pid in open(pidfile, 'r'):
227 cmd = 'ssh ' + opshost + ' "kill ' + pid + '; removerb ' + rbufinname \
228 + '; removerb ' + rbufoutname + '; removeshm ' + shmname \
229 + '; clear_basf2_ipc"'
230 print(cmd)
231 p = subprocess.Popen(cmd, shell=True)
232 p.wait()
233
234
235# Start event procesor
236
237def run_eventprocessor(conffile):
238 hostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
239 nodebase = get_rfgetconf(conffile, 'processor', 'nodebase')
240 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
241 port = get_rfgetconf(conffile, 'system', 'nsmport')
242 nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
243 procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
244 badlist = get_rfgetconf(conffile, 'processor', 'badlist')
245 id = int(get_rfgetconf(conffile, 'processor', 'idbase')) # noqa
246
247 for i in range(procid, procid + nnodes):
248 nodenum = '%2.2d' % i
249 nodeid = '%1.1d' % i # DESY only
250 if badlist.find(nodeid) == -1:
251 evphost = hostbase + nodeid
252 nodename = nodebase + nodenum
253 if not os.path.exists(basedir + '/evp_' + nodename):
254 os.mkdir(basedir + '/evp_' + nodename)
255 cmd = 'ssh ' + evphost + ' "cd ' + basedir + '; export NSM2_PORT=' \
256 + port + '; rf_eventprocessor ' + get_configpath(conffile) \
257 + ' &> evp_' + nodename + '/nsmlog.log" '
258 print(cmd)
259 subprocess.Popen(cmd, shell=True)
260 time.sleep(1)
261
262
263# Stop event procesor
264
265def stop_eventprocessor(conffile):
266 hostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
267 nodebase = get_rfgetconf(conffile, 'processor', 'nodebase')
268 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
269 port = get_rfgetconf(conffile, 'system', 'nsmport') # noqa
270 nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
271 procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
272 badlist = get_rfgetconf(conffile, 'processor', 'badlist')
273 id = int(get_rfgetconf(conffile, 'processor', 'idbase')) # noqa
274
275 unit = get_rfgetconf(conffile, 'system', 'unitname')
276 rbufin = get_rfgetconf(conffile, 'collector', 'ringbufin')
277 rbufout = get_rfgetconf(conffile, 'collector', 'ringbufout')
278 rbufinname = unit + ':' + rbufin
279 rbufoutname = unit + ':' + rbufout
280
281 for i in range(procid, procid + nnodes):
282 nodeid = '%2.2d' % i
283# nodeid = '%1.1d' % i # DESY only
284 if badlist.find(nodeid) == -1:
285 evphost = hostbase + nodeid
286 nodename = 'evp_' + nodebase + nodeid
287 shmname = unit + ':' + nodename
288 print(shmname)
289 p = subprocess.Popen('rfcommand ' + conffile + ' ' + nodename +
290 ' RF_UNCONFIGURE', shell=True)
291 p.wait()
292 pidfile = basedir + '/' + nodename + '/pid.data'
293 for pid in open(pidfile, 'r'):
294 cmd = 'ssh ' + evphost + ' "kill ' + pid + '; removerb ' \
295 + rbufinname + '; removerb ' + rbufoutname \
296 + '; removeshm ' + shmname + '; clear_basf2_ipc"'
297# + '; removeshm ' + '"'
298# + '; removeshm ' + shmname + '"'
299 print(cmd)
300 p = subprocess.Popen(cmd, shell=True)
301 p.wait()
302
303
304# Run dqmserver
305
306def run_dqmserver(conffile):
307 dqmhost = get_rfgetconf(conffile, 'dqmserver', 'ctlhost')
308 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
309 port = get_rfgetconf(conffile, 'system', 'nsmport')
310 if not os.path.exists(basedir + '/dqmserver'):
311 os.mkdir(basedir + '/dqmserver')
312 cmd = 'ssh ' + dqmhost + ' "cd ' + basedir + '; export NSM2_PORT=' + port \
313 + '; rf_dqmserver ' + get_configpath(conffile) \
314 + ' &> dqmserver/nsmlog.log" '
315 print(cmd)
316 subprocess.Popen(cmd, shell=True)
317 time.sleep(1)
318
319
320# Stop dqmserver
321
322def stop_dqmserver(conffile):
323 dqmhost = get_rfgetconf(conffile, 'dqmserver', 'ctlhost')
324 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
325 p = subprocess.Popen('rfcommand ' + conffile + ' dqmserver RF_UNCONFIGURE', shell=True)
326 p.wait()
327 pidfile = basedir + '/dqmserver/pid.data'
328 for pid in open(pidfile, 'r'):
329 cmd = 'ssh ' + dqmhost + ' "kill ' + pid + '"'
330 print(cmd)
331 p = subprocess.Popen(cmd, shell=True)
332 p.wait()
333
334
335# Run roisender
336
337def run_roisender(conffile):
338 roihost = get_rfgetconf(conffile, 'roisender', 'ctlhost')
339 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
340 port = get_rfgetconf(conffile, 'system', 'nsmport')
341 if not os.path.exists(basedir + '/roisender'):
342 os.mkdir(basedir + '/roisender')
343 cmd = 'ssh ' + roihost + ' "cd ' + basedir + '; export NSM2_PORT=' + port \
344 + '; rf_roisender ' + get_configpath(conffile) \
345 + ' &> roisender/nsmlog.log" '
346 print(cmd)
347 subprocess.Popen(cmd, shell=True)
348 time.sleep(1)
349
350
351# Stop roisender
352
353def stop_roisender(conffile):
354 roihost = get_rfgetconf(conffile, 'roisender', 'ctlhost')
355 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
356 unit = get_rfgetconf(conffile, 'system', 'unitname')
357 shmname = unit + ':roisender'
358 p = subprocess.Popen('rfcommand ' + conffile + ' roisender RF_UNCONFIGURE', shell=True)
359 p.wait()
360 pidfile = basedir + '/roisender/pid.data'
361 for pid in open(pidfile, 'r'):
362 cmd = 'ssh ' + roihost + ' "kill ' + pid + '; removeshm ' + shmname + '"'
363 print(cmd)
364 p = subprocess.Popen(cmd, shell=True)
365 p.wait()
366
367
368# Run local master
369
370def run_master(conffile):
371 masterhost = get_rfgetconf(conffile, 'master', 'ctlhost')
372 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
373 port = get_rfgetconf(conffile, 'system', 'nsmport')
374 if not os.path.exists(basedir + '/master'):
375 os.mkdir(basedir + '/master')
376 cmd = 'ssh ' + masterhost + ' "cd ' + basedir + '; export NSM2_PORT=' \
377 + port + '; rf_master_local ' + get_configpath(conffile) \
378 + ' &> master/nsmlog.log" '
379 print(cmd)
380 subprocess.Popen(cmd, shell=True)
381 time.sleep(1)
382
383
384# Stop local master
385
386def stop_master(conffile):
387 masterhost = get_rfgetconf(conffile, 'master', 'ctlhost')
388 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
389# p = subprocess.Popen ( "rfcommand " + conffile + " master RF_UNCONFIGURE", shell=True );
390# p.wait();
391 pidfile = basedir + '/master/pid.data'
392 for pid in open(pidfile, 'r'):
393 cmd = 'ssh ' + masterhost + ' "kill ' + pid + '"'
394 print(cmd)
395 p = subprocess.Popen(cmd, shell=True)
396 p.wait()
397
398
399def start_rfarm_components(conffile):
400 run_eventprocessor(conffile)
401 run_outputserver(conffile)
402 run_eventserver(conffile)
403 run_dqmserver(conffile)
404 run_roisender(conffile)
405
406
407# Stop RFARM components
408
409def stop_rfarm_components(conffile):
410 stop_roisender(conffile)
411 stop_dqmserver(conffile)
412 stop_eventserver(conffile)
413 stop_outputserver(conffile)
414 stop_eventprocessor(conffile)
415
416
417# Start RFARM local operation
418
419def start_rfarm_local(conffile):
420 start_rfarm_components(conffile)
421 run_master(conffile)
422
423
424# Stop RFARM local operation
425
426def stop_rfarm_local(conffile):
427 # stop_eventprocessor(conffile)
428 stop_rfarm_components(conffile)
429 stop_master(conffile)