Belle II Software development
rfarmutil.py
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4
11
12import os
13import sys
14import subprocess
15import time
16
17
18# Basic Utilities
19# Get full path of the configuration file
20
21def get_configpath(conffile):
22 confdir = str(os.environ.get('RFARM_CONFDIR'))
23 if confdir == 'None':
24 print('RFARM_CONFDIR is not defined. Exit.')
25 sys.exit()
26 cmd = confdir + '/' + conffile + '.conf'
27 return cmd
28
29
30# Get Configuration from config file
31def get_rfgetconf(conffile, item1, item2='NULL', item3='NULL'):
32
33 confdir = str(os.environ.get('RFARM_CONFDIR'))
34 if confdir == 'None':
35 print('RFARM_CONFDIR is not defined. Exit.')
36 sys.exit()
37 cmd = 'rfgetconf ' + get_configpath(conffile) + ' ' + item1 + ' ' + item2 \
38 + ' ' + item3
39 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
40 stderr=subprocess.PIPE)
41 p.wait()
42 output = p.stdout.read()
43# print("getconf = ", output)
44 return output
45
46
47# print("waiting")
48# confout = p.stdout.read()
49
50# NSMD related utilities
51# Run NSMD
52
53def run_nsmd(nsmdir, port, nsmhost):
54 # Check directory for loggin
55 if not os.path.exists(nsmdir + '/' + nsmhost):
56 os.mkdir(nsmdir + '/' + nsmhost)
57# nsmd = str(os.environ.get('BELLE2_EXTERNALS_BIN')) + '/nsmd2 -f -p ' \
58 nsmd = 'nsmd2 -f -p ' \
59 + port + ' -s ' + port + ' -h '
60 cmd = 'ssh ' + nsmhost + ' "cd ' + nsmdir + '/' + nsmhost \
61 + '; setenv NSMLOGDIR ' + nsmdir + '/' + nsmhost + ';' + nsmd \
62 + nsmhost + '"'
63# print(cmd)
64 subprocess.Popen(cmd, shell=True)
65 time.sleep(1)
66
67
68# p.wait()
69
70# Kill NSMD
71
72def kill_nsmd(port, nsmhost):
73 cmd = 'ssh ' + nsmhost + ' "ps -fC nsmd2 | grep ' + port \
74 + "| awk '{print \\$2}' \" > temp.pid"
75# cmd = "ssh -v " + nsmhost + " \"ps -fC nsmd2 | grep " + port + "| awk '{printf(\"klll \%d\", \$2)} | sh' \""
76# print(cmd)
77 p = subprocess.Popen(cmd, shell=True)
78 p.wait()
79 for line in open('temp.pid', 'r'):
80 pid = int(line)
81 if pid > 0:
82 cmd = 'ssh ' + nsmhost + ' "kill ' + str(pid) + '"'
83# print(cmd)
84 p = subprocess.Popen(cmd, shell=True)
85 p.wait()
86
87
88# Start NSMD on all nodes
89
90def start_nsmd(conffile):
91 # Global parameters
92 nsmdir = get_rfgetconf(conffile, 'system', 'nsmdir_base')
93 port = get_rfgetconf(conffile, 'system', 'nsmport')
94
95 # Run nsmd on control node
96 ctlhost = get_rfgetconf(conffile, 'master', 'ctlhost')
97 run_nsmd(nsmdir, port, ctlhost)
98 print('nsmd on %s started' % ctlhost)
99
100 # Run nsmd on event server node
101 evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
102 if ctlhost.find(evshost) == -1:
103 run_nsmd(nsmdir, port, evshost)
104 print('nsmd on %s started' % evshost)
105
106 # Run nsmd on output server node
107 opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
108 run_nsmd(nsmdir, port, opshost)
109 print('nsmd on %s started' % opshost)
110
111 # Run nsmd on event processor nodes
112 nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
113 procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
114 badlist = get_rfgetconf(conffile, 'processor', 'badlist')
115 evphostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
116 for i in range(procid, procid + nnodes):
117 nodeid = '%2.2d' % i
118 if badlist.find(nodeid) == -1:
119 evphost = evphostbase + nodeid
120 run_nsmd(nsmdir, port, evphost)
121 print('nsmd on %s started' % evphost)
122
123
124def stop_nsmd(conffile):
125 port = get_rfgetconf(conffile, 'system', 'nsmport')
126
127 # Kill nsmd on control node
128 ctlhost = get_rfgetconf(conffile, 'master', 'ctlhost')
129 kill_nsmd(port, ctlhost)
130 print('nsmd on %s stopped' % ctlhost)
131
132 # Run nsmd on event server node
133 evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
134 if ctlhost.find(evshost) == -1:
135 kill_nsmd(port, evshost)
136 print('nsmd on %s stopped' % evshost)
137
138 # Run nsmd on output server node
139 opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
140 kill_nsmd(port, opshost)
141 print('nsmd on %s stopped' % opshost)
142
143 # Run nsmd on event processor nodes
144 nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
145 procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
146 badlist = get_rfgetconf(conffile, 'processor', 'badlist')
147 evphostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
148 for i in range(procid, procid + nnodes):
149 nodeid = '%2.2d' % i
150 if badlist.find(nodeid) == -1:
151 evphost = evphostbase + nodeid
152 kill_nsmd(port, evphost)
153 print('nsmd on %s stopped' % evphost)
154
155
156# RFARM server operations
157# Run eventserver
158
159def run_eventserver(conffile):
160 evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
161 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
162 port = get_rfgetconf(conffile, 'system', 'nsmport')
163 if not os.path.exists(basedir + '/distributor'):
164 os.mkdir(basedir + '/distributor')
165 cmd = 'ssh ' + evshost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
166 + '; rf_eventserver ' + get_configpath(conffile) \
167 + ' > & distributor/nsmlog.log" '
168 print(cmd)
169 subprocess.Popen(cmd, shell=True)
170 time.sleep(1)
171
172
173# Stop eventserver
174
175def stop_eventserver(conffile):
176 evshost = get_rfgetconf(conffile, 'distributor', 'ctlhost')
177 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
178 unit = get_rfgetconf(conffile, 'system', 'unitname')
179 ringbuf = get_rfgetconf(conffile, 'distributor', 'ringbuffer')
180 rbufname = unit + ':' + ringbuf
181 shmname = unit + ':distributor'
182# p = subprocess.Popen('rfcommand ' + conffile +
183# ' distributor RC_ABORT', shell=True)
184# p.wait()
185 pidfile = basedir + '/distributor/pid.data'
186 for pid in open(pidfile, 'r'):
187 cmd = 'ssh ' + evshost + ' "kill ' + pid + '; removerb ' + rbufname \
188 + "; removeshm " + shmname + '"'
189 print(cmd)
190 p = subprocess.Popen(cmd, shell=True)
191 p.wait()
192
193
194# Run outputserver
195
196def run_outputserver(conffile):
197 opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
198 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
199 port = get_rfgetconf(conffile, 'system', 'nsmport')
200 if not os.path.exists(basedir + '/collector'):
201 os.mkdir(basedir + '/collector')
202 cmd = 'ssh ' + opshost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
203 + '; rf_outputserver ' + get_configpath(conffile) \
204 + ' > & collector/nsmlog.log" '
205 print(cmd)
206 subprocess.Popen(cmd, shell=True)
207 time.sleep(1)
208
209
210# Stop outputserver
211
212def stop_outputserver(conffile):
213 opshost = get_rfgetconf(conffile, 'collector', 'ctlhost')
214 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
215 unit = get_rfgetconf(conffile, 'system', 'unitname')
216 rbufin = get_rfgetconf(conffile, 'collector', 'ringbufin')
217 rbufout = get_rfgetconf(conffile, 'collector', 'ringbufout')
218 rbufinname = unit + ':' + rbufin
219 rbufoutname = unit + ':' + rbufout
220 shmname = unit + ':collector'
221# p = subprocess.Popen('rfcommand ' + conffile + ' collector RC_ABORT', shell=True)
222# p.wait()
223 pidfile = basedir + '/collector/pid.data'
224 for pid in open(pidfile, 'r'):
225 cmd = 'ssh ' + opshost + ' "kill ' + pid + '; removerb ' + rbufinname \
226 + '; removerb ' + rbufoutname + '; removeshm ' + shmname \
227 + '; clear_basf2_ipc"'
228 print(cmd)
229 p = subprocess.Popen(cmd, shell=True)
230 p.wait()
231
232
233# Start event procesor
234
235def run_eventprocessor(conffile):
236 hostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
237 nodebase = get_rfgetconf(conffile, 'processor', 'nodebase')
238 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
239 port = get_rfgetconf(conffile, 'system', 'nsmport')
240 nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
241 procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
242 badlist = get_rfgetconf(conffile, 'processor', 'badlist')
243 id = int(get_rfgetconf(conffile, 'processor', 'idbase')) # noqa
244
245 for i in range(procid, procid + nnodes):
246 nodeid = '%2.2d' % i
247 if badlist.find(nodeid) == -1:
248 evphost = hostbase + nodeid
249 nodename = nodebase + nodeid
250 if not os.path.exists(basedir + '/evp_' + nodename):
251 os.mkdir(basedir + '/evp_' + nodename)
252 cmd = 'ssh ' + evphost + ' "cd ' + basedir + '; setenv NSM2_PORT ' \
253 + port + '; rf_eventprocessor ' + get_configpath(conffile) \
254 + ' > & evp_' + nodename + '/nsmlog.log" '
255 print(cmd)
256 subprocess.Popen(cmd, shell=True)
257 time.sleep(1)
258
259
260# Stop event procesor
261
262def stop_eventprocessor(conffile):
263 hostbase = get_rfgetconf(conffile, 'processor', 'ctlhostbase')
264 nodebase = get_rfgetconf(conffile, 'processor', 'nodebase')
265 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
266 port = get_rfgetconf(conffile, 'system', 'nsmport') # noqa
267 nnodes = int(get_rfgetconf(conffile, 'processor', 'nnodes'))
268 procid = int(get_rfgetconf(conffile, 'processor', 'idbase'))
269 badlist = get_rfgetconf(conffile, 'processor', 'badlist')
270 id = int(get_rfgetconf(conffile, 'processor', 'idbase')) # noqa
271
272 unit = get_rfgetconf(conffile, 'system', 'unitname')
273 rbufin = get_rfgetconf(conffile, 'collector', 'ringbufin')
274 rbufout = get_rfgetconf(conffile, 'collector', 'ringbufout')
275 rbufinname = unit + ':' + rbufin
276 rbufoutname = unit + ':' + rbufout
277
278 for i in range(procid, procid + nnodes):
279 nodeid = '%2.2d' % i
280 if badlist.find(nodeid) == -1:
281 evphost = hostbase + nodeid
282 nodename = 'evp_' + nodebase + nodeid
283 shmname = unit + ':' + nodename
284 print(shmname)
285# p = subprocess.Popen('rfcommand ' + conffile + ' ' + nodename +
286# ' RC_ABORT', shell=True)
287# p.wait()
288 pidfile = basedir + '/' + nodename + '/pid.data'
289 for pid in open(pidfile, 'r'):
290 cmd = 'ssh ' + evphost + ' "kill ' + pid + '; removerb ' \
291 + rbufinname + '; removerb ' + rbufoutname \
292 + '; removeshm ' + shmname + '; clear_basf2_ipc"'
293# + '; removeshm ' + '"'
294# + '; removeshm ' + shmname + '"'
295 print(cmd)
296 p = subprocess.Popen(cmd, shell=True)
297 p.wait()
298
299
300# Run dqmserver
301
302def run_dqmserver(conffile):
303 dqmhost = get_rfgetconf(conffile, 'dqmserver', 'ctlhost')
304 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
305 port = get_rfgetconf(conffile, 'system', 'nsmport')
306 if not os.path.exists(basedir + '/dqmserver'):
307 os.mkdir(basedir + '/dqmserver')
308 cmd = 'ssh ' + dqmhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
309 + '; rf_dqmserver ' + get_configpath(conffile) \
310 + ' > & dqmserver/nsmlog.log" '
311 print(cmd)
312 subprocess.Popen(cmd, shell=True)
313 time.sleep(1)
314
315
316# Stop dqmserver
317
318def stop_dqmserver(conffile):
319 dqmhost = get_rfgetconf(conffile, 'dqmserver', 'ctlhost')
320 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
321# p = subprocess.Popen('rfcommand ' + conffile + ' dqmserver RC_ABORT', shell=True)
322# p.wait()
323 pidfile = basedir + '/dqmserver/pid.data'
324 for pid in open(pidfile, 'r'):
325 cmd = 'ssh ' + dqmhost + ' "kill ' + pid + '"'
326 print(cmd)
327 p = subprocess.Popen(cmd, shell=True)
328 p.wait()
329
330
331# Run roisender
332
333def run_roisender(conffile):
334 roihost = get_rfgetconf(conffile, 'roisender', 'ctlhost')
335 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
336 port = get_rfgetconf(conffile, 'system', 'nsmport')
337 if not os.path.exists(basedir + '/roisender'):
338 os.mkdir(basedir + '/roisender')
339 cmd = 'ssh ' + roihost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
340 + '; rf_roisender ' + get_configpath(conffile) \
341 + ' > & roisender/nsmlog.log" '
342 print(cmd)
343 subprocess.Popen(cmd, shell=True)
344 time.sleep(1)
345
346
347# Stop roisender
348
349def stop_roisender(conffile):
350 roihost = get_rfgetconf(conffile, 'roisender', 'ctlhost')
351 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
352 unit = get_rfgetconf(conffile, 'system', 'unitname')
353 shmname = unit + ':roisender'
354# p = subprocess.Popen('rfcommand ' + conffile + ' roisender RC_ABORT', shell=True)
355# p.wait()
356 pidfile = basedir + '/roisender/pid.data'
357 for pid in open(pidfile, 'r'):
358 cmd = 'ssh ' + roihost + ' "kill ' + pid + '; removeshm ' + shmname + '"'
359 print(cmd)
360 p = subprocess.Popen(cmd, shell=True)
361 p.wait()
362
363
364# Run local master
365
366def run_master(conffile):
367 masterhost = get_rfgetconf(conffile, 'master', 'ctlhost')
368 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
369 port = get_rfgetconf(conffile, 'system', 'nsmport')
370 if not os.path.exists(basedir + '/master'):
371 os.mkdir(basedir + '/master')
372 cmd = 'ssh ' + masterhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' \
373 + port + '; rf_master_local ' + get_configpath(conffile) \
374 + ' > & master/nsmlog.log" '
375 print(cmd)
376 subprocess.Popen(cmd, shell=True)
377 time.sleep(1)
378
379
380# Stop local master
381
382def stop_master(conffile):
383 masterhost = get_rfgetconf(conffile, 'master', 'ctlhost')
384 basedir = get_rfgetconf(conffile, 'system', 'execdir_base')
385# p = subprocess.Popen ( "rfcommand " + conffile + " master RF_UNCONFIGURE", shell=True );
386# p.wait();
387 pidfile = basedir + '/master/pid.data'
388 for pid in open(pidfile, 'r'):
389 cmd = 'ssh ' + masterhost + ' "kill ' + pid + '"'
390 print(cmd)
391 p = subprocess.Popen(cmd, shell=True)
392 p.wait()
393
394
395def start_rfarm_components(conffile):
396 run_eventprocessor(conffile)
397 run_outputserver(conffile)
398 run_eventserver(conffile)
399 run_dqmserver(conffile)
400 run_roisender(conffile)
401
402
403# Stop RFARM components
404
405def stop_rfarm_components(conffile):
406 stop_roisender(conffile)
407 print("stop dqmserver")
408 stop_dqmserver(conffile)
409 print("done")
410 stop_eventserver(conffile)
411 stop_outputserver(conffile)
412# stop_eventprocessor(conffile)
413
414
415# Start RFARM local operation
416
417def start_rfarm_local(conffile):
418 start_rfarm_components(conffile)
419 run_master(conffile)
420
421
422# Stop RFARM local operation
423
424def stop_rfarm_local(conffile):
425 print("stopping eventprocessors")
426 stop_eventprocessor(conffile)
427 print("stopping rfarm components")
428 stop_rfarm_components(conffile)
429 print("stopping master")
430 stop_master(conffile)