1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 import os
18 import glob
19 import time
20
21 from flumotion.configure import configure
22 from flumotion.common import common, errors, log
23
24 """
25 Servicer object used in service scripts
26 """
28 """
29 I manage running managers and workers on behalf of a service script.
30 """
31
32 logCategory = 'servicer'
33
34 - def __init__(self, configDir=None, logDir=None, runDir=None):
35 """
36 @type configDir: string
37 @param configDir: overridden path to the configuration directory.
38 @type logDir: string
39 @param logDir: overridden path to the log directory.
40 @type runDir: string
41 @param runDir: overridden path to the run directory.
42 """
43 self.managersDir = os.path.join(configure.configdir, 'managers')
44 self.workersDir = os.path.join(configure.configdir, 'workers')
45 self._overrideDir = {
46 'logdir': logDir,
47 'rundir': runDir,
48 }
49
51
52
53 managers = []
54 workers = []
55
56 if not args:
57 managers = self.getManagers().keys()
58 managers.sort()
59 workers = self.getWorkers()
60 workers.sort()
61 return (managers, workers)
62
63 which = args[0]
64 if which not in ['manager', 'worker']:
65 raise errors.SystemError, 'Please specify either manager or worker'
66
67 if len(args) < 2:
68 raise errors.SystemError, 'Please specify which %s to %s' % (
69 which, command)
70
71 name = args[1]
72 if which == 'manager':
73 managers = self.getManagers()
74 if not managers.has_key(name):
75 raise errors.SystemError, 'No manager "%s"' % name
76 managers = [name, ]
77 elif which == 'worker':
78 workers = self.getWorkers()
79 if not name in workers:
80 raise errors.SystemError, 'No worker with name %s' % name
81 workers = [name, ]
82
83 return (managers, workers)
84
86 """
87 Return a list of override directories for configure.configure
88 suitable for appending to a command line.
89 """
90 args = []
91 for key, value in self._overrideDir.items():
92 if value:
93 args.append('--%s=%s' % (key, value))
94 return " ".join(args)
95
97 """
98 @returns: a dictionary of manager names -> flow names
99 """
100 managers = {}
101
102 self.log('getManagers()')
103 if not os.path.exists(self.managersDir):
104 return managers
105
106 for managerDir in glob.glob(os.path.join(self.managersDir, '*')):
107 flows = []
108
109 flowsDir = os.path.join(managerDir, 'flows')
110 if os.path.exists(flowsDir):
111 flowFiles = glob.glob(os.path.join(flowsDir, '*.xml'))
112 for flowFile in flowFiles:
113 filename = os.path.split(flowFile)[1]
114 name = filename.split(".xml")[0]
115 flows.append(name)
116 managerName = os.path.split(managerDir)[1]
117 self.log('Adding flows %r to manager %s' % (flows, managerName))
118 managers[managerName] = flows
119 self.log('returning managers: %r' % managers)
120 return managers
121
123 """
124 @returns: a list of worker names
125 """
126 workers = []
127
128 if not os.path.exists(self.workersDir):
129 return workers
130
131 for workerFile in glob.glob(os.path.join(self.workersDir, '*.xml')):
132 filename = os.path.split(workerFile)[1]
133 name = filename.split(".xml")[0]
134 workers.append(name)
135 workers.sort()
136 return workers
137
139 """
140 Start processes as given in the args.
141
142 If nothing specified, start all managers and workers.
143 If first argument is "manager", start given manager.
144 If first argument is "worker", start given worker.
145
146 @returns: an exit value reflecting the number of processes that failed
147 to start
148 """
149 (managers, workers) = self._parseManagersWorkers('start', args)
150 self.debug("Start managers %r and workers %r" % (managers, workers))
151 managersDict = self.getManagers()
152 exitvalue = 0
153
154 for name in managers:
155 if not self.startManager(name, managersDict[name]):
156 exitvalue += 1
157 for name in workers:
158 if not self.startWorker(name):
159 exitvalue += 1
160
161 return exitvalue
162
163 - def stop(self, args):
164 """
165 Stop processes as given in the args.
166
167 If nothing specified, stop all managers and workers.
168 If first argument is "manager", stop given manager.
169 If first argument is "worker", stop given worker.
170
171 @returns: an exit value reflecting the number of processes that failed
172 to stop
173 """
174 (managers, workers) = self._parseManagersWorkers('stop', args)
175 self.debug("Stop managers %r and workers %r" % (managers, workers))
176
177 exitvalue = 0
178
179 for name in workers:
180 if not self.stopWorker(name):
181 exitvalue += 1
182 for name in managers:
183 if not self.stopManager(name):
184 exitvalue += 1
185
186 return exitvalue
187
189 """
190 Give status on processes as given in the args.
191 """
192 (managers, workers) = self._parseManagersWorkers('status', args)
193 self.debug("Status managers %r and workers %r" % (managers, workers))
194 for kind, names in [('manager', managers), ('worker', workers)]:
195 for name in names:
196 pid = common.getPid(kind, name)
197 if not pid:
198 print "%s %s not running" % (kind, name)
199 continue
200 if common.checkPidRunning(pid):
201 print "%s %s is running with pid %d" % (kind, name, pid)
202 else:
203 print "%s %s dead (stale pid %d)" % (kind, name, pid)
204
206 """
207 Clean up dead process pid files as given in the args.
208 """
209 (managers, workers) = self._parseManagersWorkers('clean', args)
210 self.debug("Clean managers %r and workers %r" % (managers, workers))
211 for kind, names in [('manager', managers), ('worker', workers)]:
212 for name in names:
213 pid = common.getPid(kind, name)
214 if not pid:
215
216 print "deleting bogus pid file for %s %s" % (kind, name)
217 common.deletePidFile(kind, name)
218 continue
219 if not common.checkPidRunning(pid):
220 self.debug("Cleaning up stale pid %d for %s %s" % (
221 pid, kind, name))
222 print "deleting stale pid file for %s %s" % (kind, name)
223 common.deletePidFile(kind, name)
224
226 """
227 Restart running processes as given in the args.
228
229 If nothing specified, condrestart all managers and workers.
230 If first argument is "manager", condrestart given manager.
231 If first argument is "worker", condrestart given worker.
232
233 @returns: an exit value reflecting the number of processes that failed
234 to start
235 """
236 (managers, workers) = self._parseManagersWorkers('condrestart', args)
237 self.debug("condrestart managers %r and workers %r" % (
238 managers, workers))
239 managersDict = self.getManagers()
240 exitvalue = 0
241
242 for kind, names in [('manager', managers), ('worker', workers)]:
243 for name in names:
244 pid = common.getPid(kind, name)
245 if not pid:
246 continue
247 if common.checkPidRunning(pid):
248 if kind == 'manager':
249 if not self.stopManager(name):
250 exitvalue += 1
251 continue
252 if not self.startManager(name, managersDict[name]):
253 exitvalue += 1
254 elif kind == 'worker':
255 if not self.stopWorker(name):
256 exitvalue += 1
257 continue
258 if not self.startWorker(name):
259 exitvalue += 1
260 else:
261 print "%s %s dead (stale pid %d)" % (kind, name, pid)
262
263 return exitvalue
264
266
267
268
269
270 """
271 Create a default manager or worker config.
272 """
273 if len(args) == 0:
274 raise errors.SystemError, \
275 "Please specify 'manager' or 'worker' to create."
276 kind = args[0]
277 if len(args) == 1:
278 raise errors.SystemError, \
279 "Please specify name of %s to create." % kind
280 name = args[1]
281
282 port = 7531
283 if len(args) == 3:
284 port = int(args[2])
285
286 if kind == 'manager':
287 self.createManager(name, port)
288 elif kind == 'worker':
289 self.createWorker(name, managerPort=port, randomFeederports=True)
290 else:
291 raise errors.SystemError, \
292 "Please specify 'manager' or 'worker' to create."
293
295 """
296 Create a sample manager.
297
298 @returns: whether or not the config was created.
299 """
300 self.info("Creating manager %s" % name)
301 managerDir = os.path.join(self.managersDir, name)
302 if os.path.exists(managerDir):
303 raise errors.SystemError, \
304 "Manager directory %s already exists" % managerDir
305 os.makedirs(managerDir)
306
307 planetFile = os.path.join(managerDir, 'planet.xml')
308
309
310 handle = open(planetFile, 'w')
311 handle.write("""<planet>
312 <manager>
313 <debug>4</debug>
314 <host>localhost</host>
315 <port>%(port)d</port>
316 <transport>ssl</transport>
317 <!-- certificate path can be relative to $sysconfdir/flumotion,
318 or absolute -->
319 <!--
320 <certificate>default.pem</certificate>
321 -->
322 <component name="manager-bouncer" type="htpasswdcrypt-bouncer">
323 <property name="data"><![CDATA[
324 user:PSfNpHTkpTx1M
325 ]]></property>
326 </component>
327 </manager>
328 </planet>
329 """ % locals())
330 handle.close()
331
332
333 pemFile = os.path.join(configure.configdir, 'default.pem')
334 if not os.path.exists(pemFile):
335 os.system("%s %s" % (
336 os.path.join(configure.datadir, 'make-dummy-cert'), pemFile))
337
338 return True
339
340 - def createWorker(self, name, managerPort=7531, randomFeederports=False):
341 """
342 Create a sample worker.
343
344 @returns: whether or not the config was created.
345 """
346 os.makedirs(self.workersDir)
347 self.info("Creating worker %s" % name)
348 workerFile = os.path.join(self.workersDir, "%s.xml" % name)
349 if os.path.exists(workerFile):
350 raise errors.SystemError, \
351 "Worker file %s already exists." % workerFile
352
353 feederports = " <!-- <feederports>8600-8639</feederports> -->"
354 if randomFeederports:
355 feederports = ' <feederports random="True" />'
356
357 handle = open(workerFile, 'w')
358 handle.write("""<worker>
359
360 <debug>4</debug>
361
362 <manager>
363 <host>localhost</host>
364 <port>%(managerPort)s</port>
365 </manager>
366
367 <authentication type="plaintext">
368 <username>user</username>
369 <password>test</password>
370 </authentication>
371
372 %(feederports)s
373
374 </worker>
375 """ % locals())
376 handle.close()
377
378 return True
379
380
382 """
383 Start the manager as configured in the manager directory for the given
384 manager name, together with the given flows.
385
386 @returns: whether or not the manager daemon started
387 """
388 self.info("Starting manager %s" % name)
389 self.debug("Starting manager with flows %r" % flowNames)
390 managerDir = os.path.join(self.managersDir, name)
391 planetFile = os.path.join(managerDir, 'planet.xml')
392 if not os.path.exists(planetFile):
393 raise errors.SystemError, \
394 "Planet file %s does not exist" % planetFile
395 self.info("Loading planet %s" % planetFile)
396
397 flowsDir = os.path.join(managerDir, 'flows')
398 flowFiles = []
399 for flowName in flowNames:
400 flowFile = os.path.join(flowsDir, "%s.xml" % flowName)
401 if not os.path.exists(flowFile):
402 raise errors.SystemError, \
403 "Flow file %s does not exist" % flowFile
404 flowFiles.append(flowFile)
405 self.info("Loading flow %s" % flowFile)
406
407 pid = common.getPid('manager', name)
408 if pid:
409 if common.checkPidRunning(pid):
410 raise errors.SystemError, \
411 "Manager %s is already running (with pid %d)" % (name, pid)
412 else:
413 raise errors.SystemError, \
414 "Manager %s is dead (stale pid %d)" % (name, pid)
415
416 dirOptions = self._getDirOptions()
417 command = "flumotion-manager %s -D --daemonize-to %s " \
418 "--service-name %s %s %s" % (
419 dirOptions, configure.daemondir, name, planetFile,
420 " ".join(flowFiles))
421 self.debug("starting process %s" % command)
422 retval = self.startProcess(command)
423
424 if retval == 0:
425 self.debug("Waiting for pid for manager %s" % name)
426 pid = common.waitPidFile('manager', name)
427 if pid:
428 self.info("Started manager %s with pid %d" % (name, pid))
429 return True
430 else:
431 self.warning("manager %s could not start" % name)
432 return False
433
434 self.warning("manager %s could not start (return value %d)" % (
435 name, retval))
436 return False
437
439 """
440 Start the worker as configured in the worker directory for the given
441 worker name.
442
443 @returns: whether or not the worker daemon started
444 """
445 self.info("Starting worker %s" % name)
446 workerFile = os.path.join(self.workersDir, "%s.xml" % name)
447 if not os.path.exists(workerFile):
448 raise errors.SystemError, \
449 "Worker file %s does not exist" % workerFile
450
451 pid = common.getPid('worker', name)
452 if pid:
453 if common.checkPidRunning(pid):
454 raise errors.SystemError, \
455 "Worker %s is already running (with pid %d)" % (name, pid)
456 else:
457 raise errors.SystemError, \
458 "Worker %s is dead (stale pid %d)" % (name, pid)
459
460
461 self.info("Loading worker %s" % workerFile)
462
463 dirOptions = self._getDirOptions()
464 command = "flumotion-worker %s -D --daemonize-to %s " \
465 "--service-name %s %s" % (
466 dirOptions, configure.daemondir, name, workerFile)
467 self.debug("Running %s" % command)
468 retval = self.startProcess(command)
469
470 if retval == 0:
471 self.debug("Waiting for pid for worker %s" % name)
472 pid = common.waitPidFile('worker', name)
473 if pid:
474 self.info("Started worker %s with pid %d" % (name, pid))
475 return True
476 else:
477 self.warning("worker %s could not start" % name)
478 return False
479
480 self.warning("worker %s could not start (return value %d)" % (
481 name, retval))
482 return False
483
485 """
486 Start the given process and block.
487 Returns the exit status of the process, or -1 in case of another error.
488 """
489 status = os.system(command)
490 if os.WIFEXITED(status):
491 retval = os.WEXITSTATUS(status)
492 return retval
493
494
495 return -1
496
517
519 """
520 Stop the given worker if it is running.
521 """
522 self.info("Stopping worker %s" % name)
523 pid = common.getPid('worker', name)
524 if not pid:
525 self.info("worker %s was not running" % name)
526 return True
527
528
529 if not common.checkPidRunning(pid):
530 self.info("Worker %s is dead (stale pid %d)" % (name, pid))
531 return False
532
533 self.debug('Stopping worker %s with pid %d' % (name, pid))
534 if not self.stopProcess(pid):
535 return False
536
537 self.info('Stopped worker %s with pid %d' % (name, pid))
538 return True
539
572
574 """
575 List all service parts managed.
576 """
577 managers = self.getManagers()
578 for name in managers.keys():
579 flows = managers[name]
580 print "manager %s" % name
581 if flows:
582 for flow in flows:
583 print " flow %s" % flow
584
585 workers = self.getWorkers()
586 for worker in workers:
587 print "worker %s" % worker
588