Belle II Software development
erecoutil.py
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4
11
12import os
13import sys
14import subprocess
15import time
16
17
18# Basic Utilities
19# Get full path of the configuration file
20
21def get_configpath(conffile):
22 confdir = str(os.environ.get('ERECO_CONFDIR'))
23 if confdir == 'None':
24 print('ERECO_CONFDIR is not defined. Exit.')
25 sys.exit()
26 cmd = confdir + '/' + conffile + '.conf'
27 return cmd
28
29
30# Get Configuration from config file
31def get_ergetconf(conffile, item1, item2='NULL', item3='NULL'):
32
33 confdir = str(os.environ.get('ERECO_CONFDIR'))
34 if confdir == 'None':
35 print('ERECO_CONFDIR is not defined. Exit.')
36 sys.exit()
37 cmd = 'rfgetconf ' + get_configpath(conffile) + ' ' + item1 + ' ' + item2 \
38 + ' ' + item3
39 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
40 stderr=subprocess.PIPE)
41 p.wait()
42 output = p.stdout.read()
43# print(output)
44 return output
45
46
47# print("waiting")
48# confout = p.stdout.read()
49
50# NSMD related utilities
51# Run NSMD
52
53def run_nsmd(nsmdir, port, nsmhost):
54 # Check directory for loggin
55 if not os.path.exists(nsmdir + '/' + nsmhost):
56 os.mkdir(nsmdir + '/' + nsmhost)
57 # nsmd = str(os.environ.get('BELLE2_EXTERNALS_BIN')) + '/nsmd2 -f -p ' \
58 nsmd = "nsmd2 -f -p " \
59 + port + ' -s ' + port + ' -h '
60 cmd = 'ssh ' + nsmhost + ' "cd ' + nsmdir + '/' + nsmhost \
61 + '; setenv NSMLOGDIR ' + nsmdir + '/' + nsmhost + ';' + nsmd \
62 + nsmhost + '"'
63 print(cmd)
64 subprocess.Popen(cmd, shell=True)
65 time.sleep(1)
66
67
68# p.wait()
69
70# Kill NSMD
71
72def kill_nsmd(port, nsmhost):
73 # In the line below, b2code-style-fix changes '{print \$2}'
74 # into '{print \\$2}'. To avoid this, noqa is necessary.
75 cmd = 'ssh ' + nsmhost + ' "ps -fC nsmd2 | grep ' + port \
76 + "| awk '{print \$2}' \" > temp.pid" # noqa
77# cmd = "ssh -v " + nsmhost + " \"ps -fC nsmd2 | grep " + port + "| awk '{printf(\"klll \%d\", \$2)} | sh' \""
78# print(cmd)
79 p = subprocess.Popen(cmd, shell=True)
80 p.wait()
81 for line in open('temp.pid', 'r'):
82 pid = int(line)
83 if pid > 0:
84 cmd = 'ssh ' + nsmhost + ' "kill ' + str(pid) + '"'
85# print(cmd)
86 p = subprocess.Popen(cmd, shell=True)
87 p.wait()
88
89
90# Start NSMD on all nodes
91
92def start_nsmd(conffile):
93 # Global parameters
94 nsmdir = get_ergetconf(conffile, 'system', 'nsmdir_base')
95 port = get_ergetconf(conffile, 'system', 'nsmport')
96
97 # Run nsmd on control node
98 ctlhost = get_ergetconf(conffile, 'master', 'ctlhost')
99 run_nsmd(nsmdir, port, ctlhost)
100 print('nsmd on %s started' % ctlhost)
101
102 # Run nsmd on event server node
103 evshost = get_ergetconf(conffile, 'distributor', 'ctlhost')
104 if ctlhost.find(evshost) == -1:
105 run_nsmd(nsmdir, port, evshost)
106 print('nsmd on %s started' % evshost)
107
108 # Run nsmd on event processor nodes
109 nnodes = int(get_ergetconf(conffile, 'processor', 'nnodes'))
110 procid = int(get_ergetconf(conffile, 'processor', 'idbase'))
111 badlist = get_ergetconf(conffile, 'processor', 'badlist')
112 evphostbase = get_ergetconf(conffile, 'processor', 'ctlhostbase')
113 for i in range(procid, procid + nnodes):
114 nodeid = '%2.2d' % i
115 if badlist.find(nodeid) == -1:
116 evphost = evphostbase + nodeid
117 run_nsmd(nsmdir, port, evphost)
118 print('nsmd on %s started' % evphost)
119
120
121def stop_nsmd(conffile):
122 port = get_ergetconf(conffile, 'system', 'nsmport')
123
124 # Kill nsmd on control node
125 ctlhost = get_ergetconf(conffile, 'master', 'ctlhost')
126 kill_nsmd(port, ctlhost)
127 print('nsmd on %s stopped' % ctlhost)
128
129 # Run nsmd on event server node
130 evshost = get_ergetconf(conffile, 'distributor', 'ctlhost')
131 if ctlhost.find(evshost) == -1:
132 kill_nsmd(port, evshost)
133 print('nsmd on %s stopped' % evshost)
134
135 # Run nsmd on event processor nodes
136 nnodes = int(get_ergetconf(conffile, 'processor', 'nnodes'))
137 procid = int(get_ergetconf(conffile, 'processor', 'idbase'))
138 badlist = get_ergetconf(conffile, 'processor', 'badlist')
139 evphostbase = get_ergetconf(conffile, 'processor', 'ctlhostbase')
140 for i in range(procid, procid + nnodes):
141 nodeid = '%2.2d' % i
142 if badlist.find(nodeid) == -1:
143 evphost = evphostbase + nodeid
144 kill_nsmd(port, evphost)
145 print('nsmd on %s stopped' % evphost)
146
147
148# RFARM server operations
149# Run eventserver
150
151def run_distributor(conffile):
152 evshost = get_ergetconf(conffile, 'distributor', 'ctlhost')
153 basedir = get_ergetconf(conffile, 'system', 'execdir_base')
154 port = get_ergetconf(conffile, 'system', 'nsmport')
155 if not os.path.exists(basedir + '/distributor'):
156 os.mkdir(basedir + '/distributor')
157 cmd = 'ssh ' + evshost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
158 + '; ereco_distributor ' + get_configpath(conffile) \
159 + ' > & distributor/nsmlog.log" '
160 print(cmd)
161 subprocess.Popen(cmd, shell=True)
162 time.sleep(1)
163
164
165# Stop eventserver
166
167def stop_distributor(conffile):
168 evshost = get_ergetconf(conffile, 'distributor', 'ctlhost')
169 basedir = get_ergetconf(conffile, 'system', 'execdir_base')
170 unit = get_ergetconf(conffile, 'system', 'unitname')
171 ringbuf = get_ergetconf(conffile, 'distributor', 'ringbuffer')
172 rbufname = unit + ':' + ringbuf
173 shmname = unit + ':distributor'
174 p = subprocess.Popen('rfcommand ' + conffile +
175 ' distributor RC_ABORT', shell=True)
176 p.wait()
177 pidfile = basedir + '/distributor/pid.data'
178 for pid in open(pidfile, 'r'):
179 cmd = 'ssh ' + evshost + ' "kill ' + pid + '; removerb ' + rbufname \
180 + "; removeshm " + shmname + '"'
181 print(cmd)
182 p = subprocess.Popen(cmd, shell=True)
183 p.wait()
184
185
186# Start event procesor
187
188def run_eventprocessor(conffile):
189 hostbase = get_ergetconf(conffile, 'processor', 'ctlhostbase')
190 nodebase = get_ergetconf(conffile, 'processor', 'nodebase')
191 basedir = get_ergetconf(conffile, 'system', 'execdir_base')
192 port = get_ergetconf(conffile, 'system', 'nsmport')
193 nnodes = int(get_ergetconf(conffile, 'processor', 'nnodes'))
194 procid = int(get_ergetconf(conffile, 'processor', 'idbase'))
195 badlist = get_ergetconf(conffile, 'processor', 'badlist')
196 id = int(get_ergetconf(conffile, 'processor', 'idbase')) # noqa
197
198 for i in range(procid, procid + nnodes):
199 nodeid = '%2.2d' % i
200 if badlist.find(nodeid) == -1:
201 evphost = hostbase + nodeid
202 nodename = nodebase + nodeid
203 if not os.path.exists(basedir + '/evp_' + nodename):
204 os.mkdir(basedir + '/evp_' + nodename)
205 cmd = 'ssh ' + evphost + ' "cd ' + basedir + '; setenv NSM2_PORT ' \
206 + port + '; ereco_eventprocessor ' + get_configpath(conffile) \
207 + ' > & evp_' + nodename + '/nsmlog.log" '
208 print(cmd)
209 subprocess.Popen(cmd, shell=True)
210 time.sleep(1)
211
212
213# Stop event procesor
214
215def stop_eventprocessor(conffile):
216 hostbase = get_ergetconf(conffile, 'processor', 'ctlhostbase')
217 nodebase = get_ergetconf(conffile, 'processor', 'nodebase')
218 basedir = get_ergetconf(conffile, 'system', 'execdir_base')
219 port = get_ergetconf(conffile, 'system', 'nsmport') # noqa
220 nnodes = int(get_ergetconf(conffile, 'processor', 'nnodes'))
221 procid = int(get_ergetconf(conffile, 'processor', 'idbase'))
222 badlist = get_ergetconf(conffile, 'processor', 'badlist')
223 id = int(get_ergetconf(conffile, 'processor', 'idbase')) # noqa
224
225 unit = get_ergetconf(conffile, 'system', 'unitname')
226 rbufin = get_ergetconf(conffile, 'collector', 'ringbufin')
227 rbufout = get_ergetconf(conffile, 'collector', 'ringbufout')
228 rbufinname = unit + ':' + rbufin
229 rbufoutname = unit + ':' + rbufout
230
231 for i in range(procid, procid + nnodes):
232 nodeid = '%2.2d' % i
233 if badlist.find(nodeid) == -1:
234 evphost = hostbase + nodeid
235 nodename = 'evp_' + nodebase + nodeid
236 shmname = unit + ':' + nodename
237 print(shmname)
238 p = subprocess.Popen('rfcommand ' + conffile + ' ' + nodename +
239 ' RC_ABORT', shell=True)
240 p.wait()
241 pidfile = basedir + '/' + nodename + '/pid.data'
242 for pid in open(pidfile, 'r'):
243 cmd = 'ssh ' + evphost + ' "kill ' + pid + '; removerb ' \
244 + rbufinname + '; removerb ' + rbufoutname \
245 + '; removeshm ' + shmname + '; clear_basf2_ipc"'
246# + '; removeshm ' + '"'
247# + '; removeshm ' + shmname + '"'
248 print(cmd)
249 p = subprocess.Popen(cmd, shell=True)
250 p.wait()
251
252
253# Run dqmserver
254
255def run_dqmserver(conffile):
256 dqmhost = get_ergetconf(conffile, 'dqmserver', 'ctlhost')
257 basedir = get_ergetconf(conffile, 'system', 'execdir_base')
258 port = get_ergetconf(conffile, 'system', 'nsmport')
259 if not os.path.exists(basedir + '/dqmserver'):
260 os.mkdir(basedir + '/dqmserver')
261 cmd = 'ssh ' + dqmhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
262 + '; rf_dqmserver ' + get_configpath(conffile) \
263 + ' > & dqmserver/nsmlog.log" '
264 print(cmd)
265 subprocess.Popen(cmd, shell=True)
266 time.sleep(1)
267
268
269# Stop dqmserver
270
271def stop_dqmserver(conffile):
272 dqmhost = get_ergetconf(conffile, 'dqmserver', 'ctlhost')
273 basedir = get_ergetconf(conffile, 'system', 'execdir_base')
274 p = subprocess.Popen('rfcommand ' + conffile + ' dqmserver RC_ABORT', shell=True)
275 p.wait()
276 pidfile = basedir + '/dqmserver/pid.data'
277 for pid in open(pidfile, 'r'):
278 cmd = 'ssh ' + dqmhost + ' "kill ' + pid + '"'
279 print(cmd)
280 p = subprocess.Popen(cmd, shell=True)
281 p.wait()
282
283# Run eventsampler
284
285
286def run_eventsampler(conffile):
287 samplerhost = get_ergetconf(conffile, 'eventsampler', 'ctlhost')
288 basedir = get_ergetconf(conffile, 'system', 'execdir_base')
289 port = get_ergetconf(conffile, 'system', 'nsmport')
290 if not os.path.exists(basedir + '/sampler'):
291 os.mkdir(basedir + '/sampler')
292 cmd = 'ssh ' + samplerhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' + port \
293 + '; ereco_eventsampler ' + get_configpath(conffile) \
294 + ' > & sampler/nsmlog.log" '
295 print(cmd)
296 subprocess.Popen(cmd, shell=True)
297 time.sleep(1)
298
299
300# Stop eventsampler
301
302def stop_eventsampler(conffile):
303 samplerhost = get_ergetconf(conffile, 'eventsampler', 'ctlhost')
304 basedir = get_ergetconf(conffile, 'system', 'execdir_base')
305 p = subprocess.Popen('rfcommand ' + conffile + ' sampler RC_ABORT', shell=True)
306 p.wait()
307 pidfile = basedir + '/sampler/pid.data'
308 for pid in open(pidfile, 'r'):
309 cmd = 'ssh ' + samplerhost + ' "kill ' + pid + '"'
310 print(cmd)
311 p = subprocess.Popen(cmd, shell=True)
312 p.wait()
313
314
315# Run local master
316
317def run_master(conffile):
318 masterhost = get_ergetconf(conffile, 'master', 'ctlhost')
319 basedir = get_ergetconf(conffile, 'system', 'execdir_base')
320 port = get_ergetconf(conffile, 'system', 'nsmport')
321 if not os.path.exists(basedir + '/master'):
322 os.mkdir(basedir + '/master')
323 cmd = 'ssh ' + masterhost + ' "cd ' + basedir + '; setenv NSM2_PORT ' \
324 + port + '; ereco_master_local ' + get_configpath(conffile) \
325 + ' > & master/nsmlog.log" '
326 print(cmd)
327 subprocess.Popen(cmd, shell=True)
328 time.sleep(1)
329
330
331# Stop local master
332
333def stop_master(conffile):
334 masterhost = get_ergetconf(conffile, 'master', 'ctlhost')
335 basedir = get_ergetconf(conffile, 'system', 'execdir_base')
336# p = subprocess.Popen ( "rfcommand " + conffile + " master RF_UNCONFIGURE", shell=True );
337# p.wait();
338 pidfile = basedir + '/master/pid.data'
339 for pid in open(pidfile, 'r'):
340 cmd = 'ssh ' + masterhost + ' "kill ' + pid + '"'
341 print(cmd)
342 p = subprocess.Popen(cmd, shell=True)
343 p.wait()
344
345
346def start_ereco_components(conffile):
347 run_eventprocessor(conffile)
348 run_distributor(conffile)
349 run_dqmserver(conffile)
350 run_eventsampler(conffile)
351
352
353# Stop ERECO components
354
355def stop_ereco_components(conffile):
356 stop_eventsampler(conffile)
357 stop_dqmserver(conffile)
358 stop_distributor(conffile)
359 stop_eventprocessor(conffile)
360
361
362# Start ERECO local operation
363
364def start_ereco_local(conffile):
365 start_ereco_components(conffile)
366 run_master(conffile)
367
368
369# Stop ERECO local operation
370
371def stop_ereco_local(conffile):
372 # stop_eventprocessor(conffile)
373 stop_ereco_components(conffile)
374 stop_master(conffile)