Package flumotion :: Package service :: Module service
[hide private]

Source Code for Module flumotion.service.service

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This program is free software; you can redistribute it and/or modify 
  9  # it under the terms of the GNU General Public License as published by 
 10  # the Free Software Foundation; either version 2 of the License, or 
 11  # (at your option) any later version. 
 12  # See "LICENSE.GPL" in the source distribution for more information. 
 13   
 14  # This program is also licensed under the Flumotion license. 
 15  # See "LICENSE.Flumotion" in the source distribution for more information. 
 16   
 17  import os 
 18  import glob 
 19  import time 
 20   
 21  from flumotion.configure import configure 
 22  from flumotion.common import common, errors, log 
 23   
 24  """ 
 25  Servicer object used in service scripts 
 26  """ 
27 -class Servicer(log.Loggable):
28 """ 29 I manage running managers and workers on behalf of a service script. 30 """ 31 32 logCategory = 'servicer' 33
34 - def __init__(self, configDir=None, logDir=None, runDir=None):
35 """ 36 @type configDir: string 37 @param configDir: overridden path to the configuration directory. 38 @type logDir: string 39 @param logDir: overridden path to the log directory. 40 @type runDir: string 41 @param runDir: overridden path to the run directory. 42 """ 43 self.managersDir = os.path.join(configure.configdir, 'managers') 44 self.workersDir = os.path.join(configure.configdir, 'workers') 45 self._overrideDir = { 46 'logdir': logDir, 47 'rundir': runDir, 48 }
49
50 - def _parseManagersWorkers(self, command, args):
51 # parse the given args and return two sorted lists; 52 # one of manager names to act on and one of worker names 53 managers = [] 54 workers = [] 55 56 if not args: 57 managers = self.getManagers().keys() 58 managers.sort() 59 workers = self.getWorkers() 60 workers.sort() 61 return (managers, workers) 62 63 which = args[0] 64 if which not in ['manager', 'worker']: 65 raise errors.SystemError, 'Please specify either manager or worker' 66 67 if len(args) < 2: 68 raise errors.SystemError, 'Please specify which %s to %s' % ( 69 which, command) 70 71 name = args[1] 72 if which == 'manager': 73 managers = self.getManagers() 74 if not managers.has_key(name): 75 raise errors.SystemError, 'No manager "%s"' % name 76 managers = [name, ] 77 elif which == 'worker': 78 workers = self.getWorkers() 79 if not name in workers: 80 raise errors.SystemError, 'No worker with name %s' % name 81 workers = [name, ] 82 83 return (managers, workers)
84
85 - def _getDirOptions(self):
86 """ 87 Return a list of override directories for configure.configure 88 suitable for appending to a command line. 89 """ 90 args = [] 91 for key, value in self._overrideDir.items(): 92 if value: 93 args.append('--%s=%s' % (key, value)) 94 return " ".join(args)
95
96 - def getManagers(self):
97 """ 98 @returns: a dictionary of manager names -> flow names 99 """ 100 managers = {} 101 102 self.log('getManagers()') 103 if not os.path.exists(self.managersDir): 104 return managers 105 106 for managerDir in glob.glob(os.path.join(self.managersDir, '*')): 107 flows = [] # names of flows 108 # find flow files 109 flowsDir = os.path.join(managerDir, 'flows') 110 if os.path.exists(flowsDir): 111 flowFiles = glob.glob(os.path.join(flowsDir, '*.xml')) 112 for flowFile in flowFiles: 113 filename = os.path.split(flowFile)[1] 114 name = filename.split(".xml")[0] 115 flows.append(name) 116 managerName = os.path.split(managerDir)[1] 117 self.log('Adding flows %r to manager %s' % (flows, managerName)) 118 managers[managerName] = flows 119 self.log('returning managers: %r' % managers) 120 return managers
121
122 - def getWorkers(self):
123 """ 124 @returns: a list of worker names 125 """ 126 workers = [] 127 128 if not os.path.exists(self.workersDir): 129 return workers 130 131 for workerFile in glob.glob(os.path.join(self.workersDir, '*.xml')): 132 filename = os.path.split(workerFile)[1] 133 name = filename.split(".xml")[0] 134 workers.append(name) 135 workers.sort() 136 return workers
137
138 - def start(self, args):
139 """ 140 Start processes as given in the args. 141 142 If nothing specified, start all managers and workers. 143 If first argument is "manager", start given manager. 144 If first argument is "worker", start given worker. 145 146 @returns: an exit value reflecting the number of processes that failed 147 to start 148 """ 149 (managers, workers) = self._parseManagersWorkers('start', args) 150 self.debug("Start managers %r and workers %r" % (managers, workers)) 151 managersDict = self.getManagers() 152 exitvalue = 0 153 154 for name in managers: 155 if not self.startManager(name, managersDict[name]): 156 exitvalue += 1 157 for name in workers: 158 if not self.startWorker(name): 159 exitvalue += 1 160 161 return exitvalue
162
163 - def stop(self, args):
164 """ 165 Stop processes as given in the args. 166 167 If nothing specified, stop all managers and workers. 168 If first argument is "manager", stop given manager. 169 If first argument is "worker", stop given worker. 170 171 @returns: an exit value reflecting the number of processes that failed 172 to stop 173 """ 174 (managers, workers) = self._parseManagersWorkers('stop', args) 175 self.debug("Stop managers %r and workers %r" % (managers, workers)) 176 177 exitvalue = 0 178 179 for name in workers: 180 if not self.stopWorker(name): 181 exitvalue += 1 182 for name in managers: 183 if not self.stopManager(name): 184 exitvalue += 1 185 186 return exitvalue
187
188 - def status(self, args):
189 """ 190 Give status on processes as given in the args. 191 """ 192 (managers, workers) = self._parseManagersWorkers('status', args) 193 self.debug("Status managers %r and workers %r" % (managers, workers)) 194 for kind, names in [('manager', managers), ('worker', workers)]: 195 for name in names: 196 pid = common.getPid(kind, name) 197 if not pid: 198 print "%s %s not running" % (kind, name) 199 continue 200 if common.checkPidRunning(pid): 201 print "%s %s is running with pid %d" % (kind, name, pid) 202 else: 203 print "%s %s dead (stale pid %d)" % (kind, name, pid)
204
205 - def clean(self, args):
206 """ 207 Clean up dead process pid files as given in the args. 208 """ 209 (managers, workers) = self._parseManagersWorkers('clean', args) 210 self.debug("Clean managers %r and workers %r" % (managers, workers)) 211 for kind, names in [('manager', managers), ('worker', workers)]: 212 for name in names: 213 pid = common.getPid(kind, name) 214 if not pid: 215 # may be a file that contains bogus data 216 print "deleting bogus pid file for %s %s" % (kind, name) 217 common.deletePidFile(kind, name) 218 continue 219 if not common.checkPidRunning(pid): 220 self.debug("Cleaning up stale pid %d for %s %s" % ( 221 pid, kind, name)) 222 print "deleting stale pid file for %s %s" % (kind, name) 223 common.deletePidFile(kind, name)
224
225 - def condrestart(self, args):
226 """ 227 Restart running processes as given in the args. 228 229 If nothing specified, condrestart all managers and workers. 230 If first argument is "manager", condrestart given manager. 231 If first argument is "worker", condrestart given worker. 232 233 @returns: an exit value reflecting the number of processes that failed 234 to start 235 """ 236 (managers, workers) = self._parseManagersWorkers('condrestart', args) 237 self.debug("condrestart managers %r and workers %r" % ( 238 managers, workers)) 239 managersDict = self.getManagers() 240 exitvalue = 0 241 242 for kind, names in [('manager', managers), ('worker', workers)]: 243 for name in names: 244 pid = common.getPid(kind, name) 245 if not pid: 246 continue 247 if common.checkPidRunning(pid): 248 if kind == 'manager': 249 if not self.stopManager(name): 250 exitvalue += 1 251 continue 252 if not self.startManager(name, managersDict[name]): 253 exitvalue += 1 254 elif kind == 'worker': 255 if not self.stopWorker(name): 256 exitvalue += 1 257 continue 258 if not self.startWorker(name): 259 exitvalue += 1 260 else: 261 print "%s %s dead (stale pid %d)" % (kind, name, pid) 262 263 return exitvalue
264
265 - def create(self, args):
266 # TODO: Andy suggested we should be able to customize the 267 # configuration this generates. 268 # For that we maybe first want to use the Command class way of 269 # writing the service script. 270 """ 271 Create a default manager or worker config. 272 """ 273 if len(args) == 0: 274 raise errors.SystemError, \ 275 "Please specify 'manager' or 'worker' to create." 276 kind = args[0] 277 if len(args) == 1: 278 raise errors.SystemError, \ 279 "Please specify name of %s to create." % kind 280 name = args[1] 281 282 port = 7531 283 if len(args) == 3: 284 port = int(args[2]) 285 286 if kind == 'manager': 287 self.createManager(name, port) 288 elif kind == 'worker': 289 self.createWorker(name, managerPort=port, randomFeederports=True) 290 else: 291 raise errors.SystemError, \ 292 "Please specify 'manager' or 'worker' to create."
293
294 - def createManager(self, name, port=7531):
295 """ 296 Create a sample manager. 297 298 @returns: whether or not the config was created. 299 """ 300 self.info("Creating manager %s" % name) 301 managerDir = os.path.join(self.managersDir, name) 302 if os.path.exists(managerDir): 303 raise errors.SystemError, \ 304 "Manager directory %s already exists" % managerDir 305 os.makedirs(managerDir) 306 307 planetFile = os.path.join(managerDir, 'planet.xml') 308 309 # generate the file 310 handle = open(planetFile, 'w') 311 handle.write("""<planet> 312 <manager> 313 <debug>4</debug> 314 <host>localhost</host> 315 <port>%(port)d</port> 316 <transport>ssl</transport> 317 <!-- certificate path can be relative to $sysconfdir/flumotion, 318 or absolute --> 319 <!-- 320 <certificate>default.pem</certificate> 321 --> 322 <component name="manager-bouncer" type="htpasswdcrypt-bouncer"> 323 <property name="data"><![CDATA[ 324 user:PSfNpHTkpTx1M 325 ]]></property> 326 </component> 327 </manager> 328 </planet> 329 """ % locals()) 330 handle.close() 331 332 # create a default.pem file if it doesn't exist yet 333 pemFile = os.path.join(configure.configdir, 'default.pem') 334 if not os.path.exists(pemFile): 335 os.system("%s %s" % ( 336 os.path.join(configure.datadir, 'make-dummy-cert'), pemFile)) 337 338 return True
339
340 - def createWorker(self, name, managerPort=7531, randomFeederports=False):
341 """ 342 Create a sample worker. 343 344 @returns: whether or not the config was created. 345 """ 346 os.makedirs(self.workersDir) 347 self.info("Creating worker %s" % name) 348 workerFile = os.path.join(self.workersDir, "%s.xml" % name) 349 if os.path.exists(workerFile): 350 raise errors.SystemError, \ 351 "Worker file %s already exists." % workerFile 352 353 feederports = " <!-- <feederports>8600-8639</feederports> -->" 354 if randomFeederports: 355 feederports = ' <feederports random="True" />' 356 # generate the file 357 handle = open(workerFile, 'w') 358 handle.write("""<worker> 359 360 <debug>4</debug> 361 362 <manager> 363 <host>localhost</host> 364 <port>%(managerPort)s</port> 365 </manager> 366 367 <authentication type="plaintext"> 368 <username>user</username> 369 <password>test</password> 370 </authentication> 371 372 %(feederports)s 373 374 </worker> 375 """ % locals()) 376 handle.close() 377 378 return True
379 380
381 - def startManager(self, name, flowNames):
382 """ 383 Start the manager as configured in the manager directory for the given 384 manager name, together with the given flows. 385 386 @returns: whether or not the manager daemon started 387 """ 388 self.info("Starting manager %s" % name) 389 self.debug("Starting manager with flows %r" % flowNames) 390 managerDir = os.path.join(self.managersDir, name) 391 planetFile = os.path.join(managerDir, 'planet.xml') 392 if not os.path.exists(planetFile): 393 raise errors.SystemError, \ 394 "Planet file %s does not exist" % planetFile 395 self.info("Loading planet %s" % planetFile) 396 397 flowsDir = os.path.join(managerDir, 'flows') 398 flowFiles = [] 399 for flowName in flowNames: 400 flowFile = os.path.join(flowsDir, "%s.xml" % flowName) 401 if not os.path.exists(flowFile): 402 raise errors.SystemError, \ 403 "Flow file %s does not exist" % flowFile 404 flowFiles.append(flowFile) 405 self.info("Loading flow %s" % flowFile) 406 407 pid = common.getPid('manager', name) 408 if pid: 409 if common.checkPidRunning(pid): 410 raise errors.SystemError, \ 411 "Manager %s is already running (with pid %d)" % (name, pid) 412 else: 413 raise errors.SystemError, \ 414 "Manager %s is dead (stale pid %d)" % (name, pid) 415 416 dirOptions = self._getDirOptions() 417 command = "flumotion-manager %s -D --daemonize-to %s " \ 418 "--service-name %s %s %s" % ( 419 dirOptions, configure.daemondir, name, planetFile, 420 " ".join(flowFiles)) 421 self.debug("starting process %s" % command) 422 retval = self.startProcess(command) 423 424 if retval == 0: 425 self.debug("Waiting for pid for manager %s" % name) 426 pid = common.waitPidFile('manager', name) 427 if pid: 428 self.info("Started manager %s with pid %d" % (name, pid)) 429 return True 430 else: 431 self.warning("manager %s could not start" % name) 432 return False 433 434 self.warning("manager %s could not start (return value %d)" % ( 435 name, retval)) 436 return False
437
438 - def startWorker(self, name):
439 """ 440 Start the worker as configured in the worker directory for the given 441 worker name. 442 443 @returns: whether or not the worker daemon started 444 """ 445 self.info("Starting worker %s" % name) 446 workerFile = os.path.join(self.workersDir, "%s.xml" % name) 447 if not os.path.exists(workerFile): 448 raise errors.SystemError, \ 449 "Worker file %s does not exist" % workerFile 450 451 pid = common.getPid('worker', name) 452 if pid: 453 if common.checkPidRunning(pid): 454 raise errors.SystemError, \ 455 "Worker %s is already running (with pid %d)" % (name, pid) 456 else: 457 raise errors.SystemError, \ 458 "Worker %s is dead (stale pid %d)" % (name, pid) 459 460 # we are sure the worker is not running and there's no pid file 461 self.info("Loading worker %s" % workerFile) 462 463 dirOptions = self._getDirOptions() 464 command = "flumotion-worker %s -D --daemonize-to %s " \ 465 "--service-name %s %s" % ( 466 dirOptions, configure.daemondir, name, workerFile) 467 self.debug("Running %s" % command) 468 retval = self.startProcess(command) 469 470 if retval == 0: 471 self.debug("Waiting for pid for worker %s" % name) 472 pid = common.waitPidFile('worker', name) 473 if pid: 474 self.info("Started worker %s with pid %d" % (name, pid)) 475 return True 476 else: 477 self.warning("worker %s could not start" % name) 478 return False 479 480 self.warning("worker %s could not start (return value %d)" % ( 481 name, retval)) 482 return False
483
484 - def startProcess(self, command):
485 """ 486 Start the given process and block. 487 Returns the exit status of the process, or -1 in case of another error. 488 """ 489 status = os.system(command) 490 if os.WIFEXITED(status): 491 retval = os.WEXITSTATUS(status) 492 return retval 493 494 # definately something wrong 495 return -1
496
497 - def stopManager(self, name):
498 """ 499 Stop the given manager if it is running. 500 """ 501 self.info("Stopping manager %s" % name) 502 pid = common.getPid('manager', name) 503 if not pid: 504 return True 505 506 # FIXME: ensure a correct process is running this pid 507 if not common.checkPidRunning(pid): 508 self.info("Manager %s is dead (stale pid %d)" % (name, pid)) 509 return False 510 511 self.debug('Stopping manager %s with pid %d' % (name, pid)) 512 if not self.stopProcess(pid): 513 return False 514 515 self.info('Stopped manager %s with pid %d' % (name, pid)) 516 return True
517
518 - def stopWorker(self, name):
519 """ 520 Stop the given worker if it is running. 521 """ 522 self.info("Stopping worker %s" % name) 523 pid = common.getPid('worker', name) 524 if not pid: 525 self.info("worker %s was not running" % name) 526 return True 527 528 # FIXME: ensure a correct process is running this pid 529 if not common.checkPidRunning(pid): 530 self.info("Worker %s is dead (stale pid %d)" % (name, pid)) 531 return False 532 533 self.debug('Stopping worker %s with pid %d' % (name, pid)) 534 if not self.stopProcess(pid): 535 return False 536 537 self.info('Stopped worker %s with pid %d' % (name, pid)) 538 return True
539
540 - def stopProcess(self, pid):
541 """ 542 Stop the process with the given pid. 543 Wait until the pid has disappeared. 544 """ 545 startClock = time.clock() 546 termClock = startClock + configure.processTermWait 547 killClock = termClock + configure.processKillWait 548 549 self.debug('stopping process with pid %d' % pid) 550 if not common.termPid(pid): 551 self.warning('No process with pid %d' % pid) 552 return False 553 554 # wait for the kill 555 while (common.checkPidRunning(pid)): 556 if time.clock() > termClock: 557 self.warning("Process with pid %d has not responded to TERM " \ 558 "for %d seconds, killing" % (pid, 559 configure.processTermWait)) 560 common.killPid(pid) 561 termClock = killClock + 1.0 # so it does not get triggered again 562 563 if time.clock() > killClock: 564 self.warning("Process with pid %d has not responded to KILL " \ 565 "for %d seconds, stopping" % (pid, 566 configure.processKillWait)) 567 return False 568 569 # busy loop until kill is done 570 571 return True
572
573 - def list(self):
574 """ 575 List all service parts managed. 576 """ 577 managers = self.getManagers() 578 for name in managers.keys(): 579 flows = managers[name] 580 print "manager %s" % name 581 if flows: 582 for flow in flows: 583 print " flow %s" % flow 584 585 workers = self.getWorkers() 586 for worker in workers: 587 print "worker %s" % worker
588