21 def get_configpath(conffile):
22 confdir = str(os.environ.get(
'ERECO_CONFDIR'))
24 print(
'ERECO_CONFDIR is not defined. Exit.')
26 cmd = confdir +
'/' + conffile +
'.conf'
31 def get_ergetconf(conffile, item1, item2='NULL', item3='NULL'):
33 confdir = str(os.environ.get(
'ERECO_CONFDIR'))
35 print(
'ERECO_CONFDIR is not defined. Exit.')
37 cmd =
'rfgetconf ' + get_configpath(conffile) +
' ' + item1 +
' ' + item2 \
39 p = subprocess.Popen(cmd, shell=
True, stdout=subprocess.PIPE,
40 stderr=subprocess.PIPE)
42 output = p.stdout.read()
53 def run_nsmd(nsmdir, port, nsmhost):
55 if not os.path.exists(nsmdir +
'/' + nsmhost):
56 os.mkdir(nsmdir +
'/' + nsmhost)
58 nsmd =
"nsmd2 -f -p " \
59 + port +
' -s ' + port +
' -h '
60 cmd =
'ssh ' + nsmhost +
' "cd ' + nsmdir +
'/' + nsmhost \
61 +
'; setenv NSMLOGDIR ' + nsmdir +
'/' + nsmhost +
';' + nsmd \
64 subprocess.Popen(cmd, shell=
True)
72 def kill_nsmd(port, nsmhost):
75 cmd =
'ssh ' + nsmhost +
' "ps -fC nsmd2 | grep ' + port \
76 +
"| awk '{print \$2}' \" > temp.pid"
79 p = subprocess.Popen(cmd, shell=
True)
81 for line
in open(
'temp.pid',
'r'):
84 cmd =
'ssh ' + nsmhost +
' "kill ' + str(pid) +
'"'
86 p = subprocess.Popen(cmd, shell=
True)
92 def start_nsmd(conffile):
94 nsmdir = get_ergetconf(conffile,
'system',
'nsmdir_base')
95 port = get_ergetconf(conffile,
'system',
'nsmport')
98 ctlhost = get_ergetconf(conffile,
'master',
'ctlhost')
99 run_nsmd(nsmdir, port, ctlhost)
100 print(
'nsmd on %s started' % ctlhost)
103 evshost = get_ergetconf(conffile,
'distributor',
'ctlhost')
104 if ctlhost.find(evshost) == -1:
105 run_nsmd(nsmdir, port, evshost)
106 print(
'nsmd on %s started' % evshost)
109 nnodes = int(get_ergetconf(conffile,
'processor',
'nnodes'))
110 procid = int(get_ergetconf(conffile,
'processor',
'idbase'))
111 badlist = get_ergetconf(conffile,
'processor',
'badlist')
112 evphostbase = get_ergetconf(conffile,
'processor',
'ctlhostbase')
113 for i
in range(procid, procid + nnodes):
115 if badlist.find(nodeid) == -1:
116 evphost = evphostbase + nodeid
117 run_nsmd(nsmdir, port, evphost)
118 print(
'nsmd on %s started' % evphost)
121 def stop_nsmd(conffile):
122 port = get_ergetconf(conffile,
'system',
'nsmport')
125 ctlhost = get_ergetconf(conffile,
'master',
'ctlhost')
126 kill_nsmd(port, ctlhost)
127 print(
'nsmd on %s stopped' % ctlhost)
130 evshost = get_ergetconf(conffile,
'distributor',
'ctlhost')
131 if ctlhost.find(evshost) == -1:
132 kill_nsmd(port, evshost)
133 print(
'nsmd on %s stopped' % evshost)
136 nnodes = int(get_ergetconf(conffile,
'processor',
'nnodes'))
137 procid = int(get_ergetconf(conffile,
'processor',
'idbase'))
138 badlist = get_ergetconf(conffile,
'processor',
'badlist')
139 evphostbase = get_ergetconf(conffile,
'processor',
'ctlhostbase')
140 for i
in range(procid, procid + nnodes):
142 if badlist.find(nodeid) == -1:
143 evphost = evphostbase + nodeid
144 kill_nsmd(port, evphost)
145 print(
'nsmd on %s stopped' % evphost)
151 def run_distributor(conffile):
152 evshost = get_ergetconf(conffile,
'distributor',
'ctlhost')
153 basedir = get_ergetconf(conffile,
'system',
'execdir_base')
154 port = get_ergetconf(conffile,
'system',
'nsmport')
155 if not os.path.exists(basedir +
'/distributor'):
156 os.mkdir(basedir +
'/distributor')
157 cmd =
'ssh ' + evshost +
' "cd ' + basedir +
'; setenv NSM2_PORT ' + port \
158 +
'; ereco_distributor ' + get_configpath(conffile) \
159 +
' > & distributor/nsmlog.log" '
161 subprocess.Popen(cmd, shell=
True)
167 def stop_distributor(conffile):
168 evshost = get_ergetconf(conffile,
'distributor',
'ctlhost')
169 basedir = get_ergetconf(conffile,
'system',
'execdir_base')
170 unit = get_ergetconf(conffile,
'system',
'unitname')
171 ringbuf = get_ergetconf(conffile,
'distributor',
'ringbuffer')
172 rbufname = unit +
':' + ringbuf
173 shmname = unit +
':distributor'
174 p = subprocess.Popen(
'rfcommand ' + conffile +
175 ' distributor RC_ABORT', shell=
True)
177 pidfile = basedir +
'/distributor/pid.data'
178 for pid
in open(pidfile,
'r'):
179 cmd =
'ssh ' + evshost +
' "kill ' + pid +
'; removerb ' + rbufname \
180 +
"; removeshm " + shmname +
'"'
182 p = subprocess.Popen(cmd, shell=
True)
188 def run_eventprocessor(conffile):
189 hostbase = get_ergetconf(conffile,
'processor',
'ctlhostbase')
190 nodebase = get_ergetconf(conffile,
'processor',
'nodebase')
191 basedir = get_ergetconf(conffile,
'system',
'execdir_base')
192 port = get_ergetconf(conffile,
'system',
'nsmport')
193 nnodes = int(get_ergetconf(conffile,
'processor',
'nnodes'))
194 procid = int(get_ergetconf(conffile,
'processor',
'idbase'))
195 badlist = get_ergetconf(conffile,
'processor',
'badlist')
196 id = int(get_ergetconf(conffile,
'processor',
'idbase'))
198 for i
in range(procid, procid + nnodes):
200 if badlist.find(nodeid) == -1:
201 evphost = hostbase + nodeid
202 nodename = nodebase + nodeid
203 if not os.path.exists(basedir +
'/evp_' + nodename):
204 os.mkdir(basedir +
'/evp_' + nodename)
205 cmd =
'ssh ' + evphost +
' "cd ' + basedir +
'; setenv NSM2_PORT ' \
206 + port +
'; ereco_eventprocessor ' + get_configpath(conffile) \
207 +
' > & evp_' + nodename +
'/nsmlog.log" '
209 subprocess.Popen(cmd, shell=
True)
215 def stop_eventprocessor(conffile):
216 hostbase = get_ergetconf(conffile,
'processor',
'ctlhostbase')
217 nodebase = get_ergetconf(conffile,
'processor',
'nodebase')
218 basedir = get_ergetconf(conffile,
'system',
'execdir_base')
219 port = get_ergetconf(conffile,
'system',
'nsmport')
220 nnodes = int(get_ergetconf(conffile,
'processor',
'nnodes'))
221 procid = int(get_ergetconf(conffile,
'processor',
'idbase'))
222 badlist = get_ergetconf(conffile,
'processor',
'badlist')
223 id = int(get_ergetconf(conffile,
'processor',
'idbase'))
225 unit = get_ergetconf(conffile,
'system',
'unitname')
226 rbufin = get_ergetconf(conffile,
'collector',
'ringbufin')
227 rbufout = get_ergetconf(conffile,
'collector',
'ringbufout')
228 rbufinname = unit +
':' + rbufin
229 rbufoutname = unit +
':' + rbufout
231 for i
in range(procid, procid + nnodes):
233 if badlist.find(nodeid) == -1:
234 evphost = hostbase + nodeid
235 nodename =
'evp_' + nodebase + nodeid
236 shmname = unit +
':' + nodename
238 p = subprocess.Popen(
'rfcommand ' + conffile +
' ' + nodename +
239 ' RC_ABORT', shell=
True)
241 pidfile = basedir +
'/' + nodename +
'/pid.data'
242 for pid
in open(pidfile,
'r'):
243 cmd =
'ssh ' + evphost +
' "kill ' + pid +
'; removerb ' \
244 + rbufinname +
'; removerb ' + rbufoutname \
245 +
'; removeshm ' + shmname +
'; clear_basf2_ipc"'
249 p = subprocess.Popen(cmd, shell=
True)
255 def run_dqmserver(conffile):
256 dqmhost = get_ergetconf(conffile,
'dqmserver',
'ctlhost')
257 basedir = get_ergetconf(conffile,
'system',
'execdir_base')
258 port = get_ergetconf(conffile,
'system',
'nsmport')
259 if not os.path.exists(basedir +
'/dqmserver'):
260 os.mkdir(basedir +
'/dqmserver')
261 cmd =
'ssh ' + dqmhost +
' "cd ' + basedir +
'; setenv NSM2_PORT ' + port \
262 +
'; rf_dqmserver ' + get_configpath(conffile) \
263 +
' > & dqmserver/nsmlog.log" '
265 subprocess.Popen(cmd, shell=
True)
271 def stop_dqmserver(conffile):
272 dqmhost = get_ergetconf(conffile,
'dqmserver',
'ctlhost')
273 basedir = get_ergetconf(conffile,
'system',
'execdir_base')
274 p = subprocess.Popen(
'rfcommand ' + conffile +
' dqmserver RC_ABORT', shell=
True)
276 pidfile = basedir +
'/dqmserver/pid.data'
277 for pid
in open(pidfile,
'r'):
278 cmd =
'ssh ' + dqmhost +
' "kill ' + pid +
'"'
280 p = subprocess.Popen(cmd, shell=
True)
286 def run_eventsampler(conffile):
287 samplerhost = get_ergetconf(conffile,
'eventsampler',
'ctlhost')
288 basedir = get_ergetconf(conffile,
'system',
'execdir_base')
289 port = get_ergetconf(conffile,
'system',
'nsmport')
290 if not os.path.exists(basedir +
'/sampler'):
291 os.mkdir(basedir +
'/sampler')
292 cmd =
'ssh ' + samplerhost +
' "cd ' + basedir +
'; setenv NSM2_PORT ' + port \
293 +
'; ereco_eventsampler ' + get_configpath(conffile) \
294 +
' > & sampler/nsmlog.log" '
296 subprocess.Popen(cmd, shell=
True)
302 def stop_eventsampler(conffile):
303 samplerhost = get_ergetconf(conffile,
'eventsampler',
'ctlhost')
304 basedir = get_ergetconf(conffile,
'system',
'execdir_base')
305 p = subprocess.Popen(
'rfcommand ' + conffile +
' sampler RC_ABORT', shell=
True)
307 pidfile = basedir +
'/sampler/pid.data'
308 for pid
in open(pidfile,
'r'):
309 cmd =
'ssh ' + samplerhost +
' "kill ' + pid +
'"'
311 p = subprocess.Popen(cmd, shell=
True)
317 def run_master(conffile):
318 masterhost = get_ergetconf(conffile,
'master',
'ctlhost')
319 basedir = get_ergetconf(conffile,
'system',
'execdir_base')
320 port = get_ergetconf(conffile,
'system',
'nsmport')
321 if not os.path.exists(basedir +
'/master'):
322 os.mkdir(basedir +
'/master')
323 cmd =
'ssh ' + masterhost +
' "cd ' + basedir +
'; setenv NSM2_PORT ' \
324 + port +
'; ereco_master_local ' + get_configpath(conffile) \
325 +
' > & master/nsmlog.log" '
327 subprocess.Popen(cmd, shell=
True)
333 def stop_master(conffile):
334 masterhost = get_ergetconf(conffile,
'master',
'ctlhost')
335 basedir = get_ergetconf(conffile,
'system',
'execdir_base')
338 pidfile = basedir +
'/master/pid.data'
339 for pid
in open(pidfile,
'r'):
340 cmd =
'ssh ' + masterhost +
' "kill ' + pid +
'"'
342 p = subprocess.Popen(cmd, shell=
True)
346 def start_ereco_components(conffile):
347 run_eventprocessor(conffile)
348 run_distributor(conffile)
349 run_dqmserver(conffile)
350 run_eventsampler(conffile)
355 def stop_ereco_components(conffile):
356 stop_eventsampler(conffile)
357 stop_dqmserver(conffile)
358 stop_distributor(conffile)
359 stop_eventprocessor(conffile)
364 def start_ereco_local(conffile):
365 start_ereco_components(conffile)
371 def stop_ereco_local(conffile):
373 stop_ereco_components(conffile)
374 stop_master(conffile)