Package flumotion :: Package manager :: Module manager
[hide private]

Source Code for Module flumotion.manager.manager

   1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_manager -*- 
   2  # vi:si:et:sw=4:sts=4:ts=4 
   3  # 
   4  # Flumotion - a streaming media server 
   5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
   6  # All rights reserved. 
   7   
   8  # This file may be distributed and/or modified under the terms of 
   9  # the GNU General Public License version 2 as published by 
  10  # the Free Software Foundation. 
  11  # This file is distributed without any warranty; without even the implied 
  12  # warranty of merchantability or fitness for a particular purpose. 
  13  # See "LICENSE.GPL" in the source distribution for more information. 
  14   
  15  # Licensees having purchased or holding a valid Flumotion Advanced 
  16  # Streaming Server license may use this file in accordance with the 
  17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
  18  # See "LICENSE.Flumotion" in the source distribution for more information. 
  19   
  20  # Headers in this file shall remain intact. 
  21   
  22  """ 
  23  manager implementation and related classes 
  24   
  25  API Stability: semi-stable 
  26   
  27  @var  LOCAL_IDENTITY: an identity for the manager itself; can be used 
  28                        to compare against to verify that the manager 
  29                        requested an action 
  30  @type LOCAL_IDENTITY: L{LocalIdentity} 
  31  """ 
  32   
  33  __all__ = ['ManagerServerFactory', 'Vishnu'] 
  34   
  35  import os 
  36   
  37  from twisted.internet import reactor, defer 
  38  from twisted.python import components, failure 
  39  from twisted.spread import pb 
  40  from twisted.cred import portal 
  41  from zope.interface import implements 
  42   
  43  from flumotion.common import bundle, config, errors, interfaces, log, registry 
  44  from flumotion.common import planet, common, dag, messages, reflectcall, server 
  45  from flumotion.common.identity import RemoteIdentity, LocalIdentity 
  46  from flumotion.common.planet import moods 
  47  from flumotion.configure import configure 
  48  from flumotion.manager import admin, component, worker, base 
  49  from flumotion.twisted import checkers 
  50  from flumotion.twisted import portal as fportal 
  51   
  52  from flumotion.common.messages import N_ 
  53  T_ = messages.gettexter('flumotion') 
  54   
  55  LOCAL_IDENTITY = LocalIdentity('manager') 
  56   
57 -def _find(list, value, proc=lambda x: x):
58 return list[[proc(x) for x in list].index(value)]
59
60 -def _first(list, proc=lambda x: x):
61 for x in list: 62 if proc(x): return x
63
64 -def _any(list, proc=lambda x: x):
65 return filter(proc, list)
66
67 -def _fint(*procs):
68 # intersection of functions 69 def int(*args, **kwargs): 70 for p in procs: 71 if not p(*args, **kwargs): return False 72 return True
73 return int 74 75 76 # an internal class
77 -class Dispatcher(log.Loggable):
78 """ 79 I implement L{twisted.cred.portal.IRealm}. 80 I make sure that when a L{pb.Avatar} is requested through me, the 81 Avatar being returned knows about the mind (client) requesting 82 the Avatar. 83 """ 84 85 implements(portal.IRealm) 86 87 logCategory = 'dispatcher' 88
89 - def __init__(self, computeIdentity):
90 """ 91 @param computeIdentity: see L{Vishnu.computeIdentity} 92 @type computeIdentity: callable 93 """ 94 self._interfaceHeavens = {} # interface -> heaven 95 self._computeIdentity = computeIdentity 96 self._bouncer = None 97 self._avatarKeycards = {} # avatarId -> keycard
98
99 - def setBouncer(self, bouncer):
100 """ 101 @param bouncer: the bouncer to authenticate with 102 @type bouncer: L{flumotion.component.bouncers.bouncer} 103 """ 104 self._bouncer = bouncer
105
106 - def registerHeaven(self, heaven, interface):
107 """ 108 Register a Heaven as managing components with the given interface. 109 110 @type interface: L{twisted.python.components.Interface} 111 @param interface: a component interface to register the heaven with. 112 """ 113 assert isinstance(heaven, base.ManagerHeaven) 114 115 self._interfaceHeavens[interface] = heaven
116 117 ### IRealm methods
118 - def requestAvatar(self, avatarId, keycard, mind, *ifaces):
119 def got_avatar(avatar): 120 if avatar.avatarId in heaven.avatars: 121 raise errors.AlreadyConnectedError(avatar.avatarId) 122 heaven.avatars[avatar.avatarId] = avatar 123 self._avatarKeycards[avatar.avatarId] = keycard 124 125 # OK so this is byzantine, but test_manager_manager actually 126 # uses these kwargs to set its own info. so don't change 127 # these args or their order or you will break your test 128 # suite. 129 def cleanup(avatarId=avatar.avatarId, avatar=avatar, mind=mind): 130 self.info('lost connection to client %r', avatar) 131 del heaven.avatars[avatar.avatarId] 132 avatar.onShutdown() 133 # avoid leaking the keycard 134 keycard = self._avatarKeycards.pop(avatarId) 135 if self._bouncer: 136 try: 137 self._bouncer.removeKeycard(keycard) 138 except KeyError: 139 self.warning("bouncer forgot about keycard %r", 140 keycard)
141 142 return (pb.IPerspective, avatar, cleanup)
143 144 def got_error(failure): 145 # If we failed for some reason, we want to drop the connection. 146 # However, we want the failure to get to the client, so we don't 147 # call loseConnection() immediately - we return the failure first. 148 # loseConnection() will then not drop the connection until it has 149 # finished sending the current data to the client. 150 reactor.callLater(0, mind.broker.transport.loseConnection) 151 return failure 152 153 if pb.IPerspective not in ifaces: 154 raise errors.NoPerspectiveError(avatarId) 155 if len(ifaces) != 2: 156 # IPerspective and the specific avatar interface. 157 raise errors.NoPerspectiveError(avatarId) 158 iface = [x for x in ifaces if x != pb.IPerspective][0] 159 if iface not in self._interfaceHeavens: 160 self.warning('unknown interface %r', iface) 161 raise errors.NoPerspectiveError(avatarId) 162 163 heaven = self._interfaceHeavens[iface] 164 klass = heaven.avatarClass 165 host = common.addressGetHost(mind.broker.transport.getPeer()) 166 d = self._computeIdentity(keycard, host) 167 d.addCallback(lambda identity: \ 168 klass.makeAvatar(heaven, avatarId, identity, mind)) 169 d.addCallbacks(got_avatar, got_error) 170 return d 171
172 -class ComponentMapper:
173 """ 174 I am an object that ties together different objects related to a 175 component. I am used as values in a lookup hash in the vishnu. 176 """
177 - def __init__(self):
178 self.state = None # ManagerComponentState; created first 179 self.id = None # avatarId of the eventual ComponentAvatar 180 self.avatar = None # ComponentAvatar 181 self.jobState = None # ManagerJobState of a running component
182
183 -class Vishnu(log.Loggable):
184 """ 185 I am the toplevel manager object that knows about all heavens and factories. 186 187 @cvar dispatcher: dispatcher to create avatars 188 @type dispatcher: L{Dispatcher} 189 @cvar workerHeaven: the worker heaven 190 @type workerHeaven: L{worker.WorkerHeaven} 191 @cvar componentHeaven: the component heaven 192 @type componentHeaven: L{component.ComponentHeaven} 193 @cvar adminHeaven: the admin heaven 194 @type adminHeaven: L{admin.AdminHeaven} 195 """ 196 197 implements(server.IServable) 198 199 logCategory = "vishnu" 200
201 - def __init__(self, name, unsafeTracebacks=0, configDir=None):
202 # create a Dispatcher which will hand out avatars to clients 203 # connecting to me 204 self.dispatcher = Dispatcher(self.computeIdentity) 205 206 self.workerHeaven = self._createHeaven(interfaces.IWorkerMedium, 207 worker.WorkerHeaven) 208 self.componentHeaven = self._createHeaven(interfaces.IComponentMedium, 209 component.ComponentHeaven) 210 self.adminHeaven = self._createHeaven(interfaces.IAdminMedium, 211 admin.AdminHeaven) 212 213 self.running = True 214 def setStopped(): 215 self.running = False
216 reactor.addSystemEventTrigger('before', 'shutdown', setStopped) 217 218 if configDir is not None: 219 self.configDir = configDir 220 else: 221 self.configDir = os.path.join(configure.configdir, 222 "managers", name) 223 224 self.bouncer = None # used by manager to authenticate worker/component 225 226 self.bundlerBasket = registry.getRegistry().makeBundlerBasket() 227 228 self._componentMappers = {} # any object -> ComponentMapper 229 230 self.state = planet.ManagerPlanetState() 231 self.state.set('name', name) 232 233 self.plugs = {} # socket -> list of plugs 234 235 # create a portal so that I can be connected to, through our dispatcher 236 # implementing the IRealm and a bouncer 237 self.portal = fportal.BouncerPortal(self.dispatcher, None) 238 #unsafeTracebacks = 1 # for debugging tracebacks to clients 239 self.factory = pb.PBServerFactory(self.portal, 240 unsafeTracebacks=unsafeTracebacks) 241 self.connectionInfo = {} 242 self.setConnectionInfo(None, None, None)
243
244 - def shutdown(self):
245 """Cancel any pending operations in preparation for shutdown. 246 247 This method is mostly useful for unit tests; currently, it is 248 not called during normal operation. Note that the caller is 249 responsible for stopping listening on the port, as the the 250 manager does not have a handle on the twisted port object. 251 252 @returns: A deferred that will fire when the manager has shut 253 down. 254 """ 255 if self.bouncer: 256 return self.bouncer.stop() 257 else: 258 return defer.succeed(None)
259
260 - def setConnectionInfo(self, host, port, use_ssl):
261 info = dict(host=host, port=port, use_ssl=use_ssl) 262 self.connectionInfo.update(info)
263
264 - def getConfiguration(self):
265 """Returns the manager's configuration as a string suitable for 266 importing via loadConfiguration(). 267 """ 268 return config.exportPlanetXml(self.state)
269
270 - def getBundlerBasket(self):
271 """ 272 Return a bundler basket to unbundle from. 273 If the registry files were updated since the last time, the 274 bundlerbasket will be rebuilt. 275 276 @since: 0.2.2 277 @rtype: L{flumotion.common.bundle.BundlerBasket} 278 """ 279 if registry.getRegistry().rebuildNeeded(): 280 self.info("Registry changed, rebuilding") 281 registry.getRegistry().verify(force=True) 282 self.bundlerBasket = registry.getRegistry().makeBundlerBasket() 283 return self.bundlerBasket
284
285 - def addMessage(self, level, id, format, *args, **kwargs):
286 """ 287 Convenience message to construct a message and add it to the 288 planet state. `format' should be marked as translatable in the 289 source with N_, and *args will be stored as format arguments. 290 Keyword arguments are passed on to the message constructor. See 291 L{flumotion.common.messages.Message} for the meanings of the 292 rest of the arguments. 293 294 For example:: 295 296 self.addMessage(messages.WARNING, 'foo-warning', 297 N_('The answer is %d'), 42, debug='not really') 298 """ 299 self.addMessageObject(messages.Message(level, 300 T_(format, *args), 301 id=id, **kwargs))
302
303 - def addMessageObject(self, message):
304 """ 305 Add a message to the planet state. 306 307 @type message: L{flumotion.common.messages.Message} 308 """ 309 self.state.setitem('messages', message.id, message)
310
311 - def clearMessage(self, mid):
312 """ 313 Clear any messages with the given message ID from the planet 314 state. 315 316 @type mid: message ID, normally a str 317 """ 318 if mid in self.state.get('messages'): 319 self.state.delitem('messages', mid)
320
321 - def adminAction(self, identity, message, args, kw):
322 """ 323 @param identity: L{flumotion.common.identity.Identity} 324 """ 325 socket = 'flumotion.component.plugs.adminaction.AdminAction' 326 if self.plugs.has_key(socket): 327 for plug in self.plugs[socket]: 328 plug.action(identity, message, args, kw)
329
330 - def computeIdentity(self, keycard, remoteHost):
331 """ 332 Compute a suitable identity for a remote host. First looks to 333 see if there is a 334 flumotion.component.plugs.identity.IdentityProvider plug 335 installed on the manager, falling back to user@host. 336 337 The identity is only used in the adminaction interface. An 338 example of its use is when you have an adminaction plug that 339 checks an admin's privileges before actually doing an action; 340 the identity object you use here might store the privileges that 341 the admin has. 342 343 @param keycard: the keycard that the remote host used to log in. 344 @type keycard: L{flumotion.common.keycards.Keycard} 345 @param remoteHost: the ip of the remote host 346 @type remoteHost: str 347 348 @rtype: a deferred that will fire a 349 L{flumotion.common.identity.RemoteIdentity} 350 """ 351 352 socket = 'flumotion.component.plugs.identity.IdentityProvider' 353 if self.plugs.has_key(socket): 354 for plug in self.plugs[socket]: 355 identity = plug.computeIdentity(keycard, remoteHost) 356 if identity: 357 return identity 358 username = getattr(keycard, 'username', None) 359 return defer.succeed(RemoteIdentity(username, remoteHost))
360
361 - def _addComponent(self, conf, parent, identity):
362 """ 363 Add a component state for the given component config entry. 364 365 @rtype: L{flumotion.common.planet.ManagerComponentState} 366 """ 367 368 self.debug('adding component %s to %s' 369 % (conf.name, parent.get('name'))) 370 371 if identity != LOCAL_IDENTITY: 372 self.adminAction(identity, '_addComponent', (conf, parent), {}) 373 374 state = planet.ManagerComponentState() 375 state.set('name', conf.name) 376 state.set('type', conf.getType()) 377 state.set('workerRequested', conf.worker) 378 state.setMood(moods.sleeping.value) 379 state.set('config', conf.getConfigDict()) 380 381 state.set('parent', parent) 382 parent.append('components', state) 383 384 avatarId = conf.getConfigDict()['avatarId'] 385 386 self.clearMessage('loadComponent-%s' % avatarId) 387 388 # FIXME: don't use configure.versionTuple, get the appropriate 389 # version for conf['package'] 390 if not common.checkVersionsCompat(conf.getConfigDict()['version'], 391 configure.versionTuple): 392 m = messages.Warning(T_(N_("This component is configured for " 393 "Flumotion version %s, but you are running version %s.\n" 394 "Please update the configuration of the component.\n"), 395 common.versionTupleToString(conf.getConfigDict()['version']), 396 configure.version)) 397 state.append('messages', m) 398 399 # add to mapper 400 m = ComponentMapper() 401 m.state = state 402 m.id = avatarId 403 self._componentMappers[state] = m 404 self._componentMappers[avatarId] = m 405 406 return state
407
408 - def _updateStateFromConf(self, _, conf, identity):
409 """ 410 Add a new config object into the planet state. 411 412 @returns: a list of all components added 413 @rtype: list of L{flumotion.common.planet.ManagerComponentState} 414 """ 415 416 self.debug('syncing up planet state with config') 417 added = [] # added components while parsing 418 419 def checkNotRunning(comp, parentState): 420 name = comp.getName() 421 422 comps = dict([(x.get('name'), x) 423 for x in parentState.get('components')]) 424 if name not in comps: 425 return True 426 427 # if we get here, the component is already running; warn if 428 # the running configuration is different. Return False in 429 # all cases. 430 parent = comps[name].get('parent').get('name') 431 newConf = c.getConfigDict() 432 oldConf = comps[name].get('config') 433 434 if newConf == oldConf: 435 self.debug('%s already has component %s running with ' 436 'same configuration', parent, name) 437 self.clearMessage('loadComponent-%s' % oldConf['avatarId']) 438 return False 439 440 self.info('%s already has component %s, but configuration ' 441 'not the same -- notifying admin', parent, name) 442 443 diff = config.dictDiff(oldConf, newConf) 444 diffMsg = config.dictDiffMessageString(diff, 'existing', 'new') 445 446 self.addMessage(messages.WARNING, 447 'loadComponent-%s' % oldConf['avatarId'], 448 N_('Could not load component %r into %r: ' 449 'a component is already running with ' 450 'this name, but has a different ' 451 'configuration.'), name, parent, 452 debug=diffMsg) 453 return False
454 455 state = self.state 456 atmosphere = state.get('atmosphere') 457 for c in conf.atmosphere.components.values(): 458 if checkNotRunning(c, atmosphere): 459 added.append(self._addComponent(c, atmosphere, identity)) 460 461 flows = dict([(x.get('name'), x) for x in state.get('flows')]) 462 for f in conf.flows: 463 if f.name in flows: 464 flow = flows[f.name] 465 else: 466 self.info('creating flow %r', f.name) 467 flow = planet.ManagerFlowState(name=f.name, parent=state) 468 state.append('flows', flow) 469 470 for c in f.components.values(): 471 if checkNotRunning(c, flow): 472 added.append(self._addComponent(c, flow, identity)) 473 474 return added 475
476 - def _startComponents(self, components, identity):
477 # now start all components that need starting -- collecting into 478 # an temporary dict of the form {workerId => [components]} 479 componentsToStart = {} 480 for c in components: 481 workerId = c.get('workerRequested') 482 if not workerId in componentsToStart: 483 componentsToStart[workerId] = [] 484 componentsToStart[workerId].append(c) 485 self.debug('_startComponents: componentsToStart %r' % componentsToStart) 486 487 for workerId, componentStates in componentsToStart.items(): 488 self._workerCreateComponents(workerId, componentStates)
489
490 - def _loadComponentConfiguration(self, conf, identity):
491 # makeBouncer only makes a bouncer if there is one in the config 492 d = defer.succeed(None) 493 d.addCallback(self._updateStateFromConf, conf, identity) 494 d.addCallback(self._startComponents, identity) 495 return d
496
497 - def loadComponentConfigurationXML(self, file, identity):
498 """ 499 Load the configuration from the given XML, merging it on top of 500 the currently running configuration. 501 502 @param file: file to parse, either as an open file object, 503 or as the name of a file to open 504 @type file: str or file 505 @param identity: The identity making this request.. This is used by the 506 adminaction logging mechanism in order to say who is 507 performing the action. 508 @type identity: L{flumotion.common.identity.Identity} 509 """ 510 self.debug('loading configuration') 511 mid = 'loadComponent-parse-error' 512 if isinstance(file, str): 513 mid += '-%s' % file 514 try: 515 self.clearMessage(mid) 516 conf = config.FlumotionConfigXML(file) 517 conf.parse() 518 return self._loadComponentConfiguration(conf, identity) 519 except errors.ConfigError, e: 520 self.addMessage(messages.WARNING, mid, 521 N_('Invalid component configuration.'), 522 debug=e.args[0]) 523 return defer.fail(e) 524 except errors.UnknownComponentError, e: 525 if isinstance(file, str): 526 debug = 'Configuration loaded from file %r' % file 527 else: 528 debug = 'Configuration loaded remotely' 529 self.addMessage(messages.WARNING, mid, 530 N_('Unknown component in configuration: %s.'), 531 e.args[0], debug=debug) 532 return defer.fail(e) 533 except Exception, e: 534 self.addMessage(messages.WARNING, mid, 535 N_('Unknown error while loading configuration.'), 536 debug=log.getExceptionMessage(e)) 537 return defer.fail(e)
538
539 - def _loadManagerPlugs(self, conf):
540 # Load plugs 541 for socket, plugs in conf.plugs.items(): 542 if not socket in self.plugs: 543 self.plugs[socket] = [] 544 545 for args in plugs: 546 self.debug('loading plug type %s for socket %s' 547 % (args['type'], socket)) 548 defs = registry.getRegistry().getPlug(args['type']) 549 e = defs.getEntry() 550 call = reflectcall.reflectCallCatching 551 552 plug = call(errors.ConfigError, 553 e.getModuleName(), e.getFunction(), args) 554 self.plugs[socket].append(plug)
555
556 - def startManagerPlugs(self):
557 for socket in self.plugs: 558 for plug in self.plugs[socket]: 559 self.debug('starting plug %r for socket %s', plug, socket) 560 plug.start(self)
561
562 - def _loadManagerBouncer(self, conf):
563 if not (conf.bouncer): 564 self.warning('no bouncer defined, nothing can access the ' 565 'manager') 566 return defer.succeed(None) 567 568 self.debug('going to start manager bouncer %s of type %s', 569 conf.bouncer.name, conf.bouncer.type) 570 571 defs = registry.getRegistry().getComponent(conf.bouncer.type) 572 entry = defs.getEntryByType('component') 573 # FIXME: use entry.getModuleName() (doesn't work atm?) 574 moduleName = defs.getSource() 575 methodName = entry.getFunction() 576 bouncer = reflectcall.createComponent(moduleName, methodName, 577 conf.bouncer.getConfigDict()) 578 d = bouncer.waitForHappy() 579 def setupCallback(result): 580 bouncer.debug('started') 581 self.setBouncer(bouncer)
582 def setupErrback(failure): 583 self.warning('Error starting manager bouncer') 584 d.addCallbacks(setupCallback, setupErrback) 585 return d 586
587 - def loadManagerConfigurationXML(self, file):
588 """ 589 Load manager configuration from the given XML. The manager 590 configuration is currently used to load the manager's bouncer 591 and plugs, and is only run once at startup. 592 593 @param file: file to parse, either as an open file object, 594 or as the name of a file to open 595 @type file: str or file 596 """ 597 self.debug('loading configuration') 598 conf = config.ManagerConfigParser(file) 599 conf.parseBouncerAndPlugs() 600 self._loadManagerPlugs(conf) 601 self._loadManagerBouncer(conf) 602 conf.unlink()
603 604 __pychecker__ = 'maxargs=11' # hahaha
605 - def loadComponent(self, identity, componentType, componentId, 606 componentLabel, properties, workerName, 607 plugs, eaters, isClockMaster, virtualFeeds):
608 """ 609 Load a component into the manager configuration. 610 611 See L{flumotion.manager.admin.AdminAvatar.perspective_loadComponent} 612 for a definition of the argument types. 613 """ 614 self.debug('loading %s component %s on %s', 615 componentType, componentId, workerName) 616 parentName, compName = common.parseComponentId(componentId) 617 618 if isClockMaster: 619 raise NotImplementedError("Clock master components are not " 620 "yet supported") 621 if worker is None: 622 raise errors.ConfigError("Component %r needs to specify the" 623 " worker on which it should run" 624 % componentId) 625 626 state = self.state 627 compState = None 628 629 compConf = config.ConfigEntryComponent(compName, parentName, 630 componentType, 631 componentLabel, 632 properties, 633 plugs, workerName, 634 eaters, isClockMaster, 635 None, None, virtualFeeds) 636 637 if compConf.defs.getNeedsSynchronization(): 638 raise NotImplementedError("Components that need " 639 "synchronization are not yet " 640 "supported") 641 642 if parentName == 'atmosphere': 643 parentState = state.get('atmosphere') 644 else: 645 flows = dict([(x.get('name'), x) for x in state.get('flows')]) 646 if parentName in flows: 647 parentState = flows[parentName] 648 else: 649 self.info('creating flow %r', parentName) 650 parentState = planet.ManagerFlowState(name=parentName, 651 parent=state) 652 state.append('flows', parentState) 653 654 components = [x.get('name') for x in parentState.get('components')] 655 if compName in components: 656 self.debug('%r already has component %r', parentName, compName) 657 raise errors.ComponentAlreadyExistsError(compName) 658 659 compState = self._addComponent(compConf, parentState, identity) 660 661 self._startComponents([compState], identity) 662 663 return compState
664
665 - def _createHeaven(self, interface, klass):
666 """ 667 Create a heaven of the given klass that will send avatars to clients 668 implementing the given medium interface. 669 670 @param interface: the medium interface to create a heaven for 671 @type interface: L{flumotion.common.interfaces.IMedium} 672 @param klass: the type of heaven to create 673 @type klass: an implementor of L{flumotion.common.interfaces.IHeaven} 674 """ 675 assert issubclass(interface, interfaces.IMedium) 676 heaven = klass(self) 677 self.dispatcher.registerHeaven(heaven, interface) 678 return heaven
679
680 - def setBouncer(self, bouncer):
681 """ 682 @type bouncer: L{flumotion.component.bouncers.bouncer.Bouncer} 683 """ 684 if self.bouncer: 685 self.warning("manager already had a bouncer, setting anyway") 686 687 self.bouncer = bouncer 688 self.portal.bouncer = bouncer 689 self.dispatcher.setBouncer(bouncer)
690
691 - def getFactory(self):
692 return self.factory
693
694 - def componentCreate(self, componentState):
695 """ 696 Create the given component. This will currently also trigger 697 a start eventually when the component avatar attaches. 698 699 The component should be sleeping. 700 The worker it should be started on should be present. 701 """ 702 m = componentState.get('mood') 703 if m != moods.sleeping.value: 704 raise errors.ComponentMoodError("%r not sleeping but %s" % ( 705 componentState, moods.get(m).name)) 706 707 p = componentState.get('moodPending') 708 if p != None: 709 raise errors.ComponentMoodError( 710 "%r already has a pending mood %s" % ( 711 componentState, moods.get(p).name)) 712 713 # find a worker this component can start on 714 workerId = (componentState.get('workerName') 715 or componentState.get('workerRequested')) 716 717 if not workerId in self.workerHeaven.avatars: 718 raise errors.ComponentNoWorkerError( 719 "worker %s is not logged in" % workerId) 720 else: 721 return self._workerCreateComponents(workerId, [componentState])
722
723 - def _componentStopNoAvatar(self, componentState, avatarId):
724 # NB: reset moodPending if asked to stop without an avatar 725 # because we changed above to allow stopping even if moodPending 726 # is happy 727 def stopSad(): 728 # FIXME: clear messages? 729 self.debug('asked to stop a sad component without avatar') 730 componentState.setMood(moods.sleeping.value) 731 componentState.set('moodPending', None) 732 return defer.succeed(None)
733 734 def stopLost(): 735 def gotComponents(comps): 736 return avatarId in comps 737 def gotError(failure): 738 sdfa 739 def gotJobRunning(running): 740 if running: 741 self.warning('asked to stop lost component %r, but ' 742 'it is still running', avatarId) 743 # FIXME: put a message on the state to suggest a 744 # kill? 745 else: 746 self.debug('component %r seems to be really lost, ' 747 'setting to sleeping') 748 componentState.setMood(moods.sleeping.value) 749 componentState.set('moodPending', None) 750 self.debug('asked to stop a lost component without avatar') 751 workerName = componentState.get('workerRequested') 752 if workerName and self.workerHeaven.hasAvatar(workerName): 753 self.debug('checking if component has job process running') 754 d = self.workerHeaven.getAvatar(workerName).getComponents() 755 d.addCallbacks(gotComponents, gotError) 756 d.addCallback(gotJobRunning) 757 return d 758 else: 759 self.debug('component lacks a worker, setting to sleeping') 760 d = defer.maybeDeferred(gotJobRunning, False) 761 return d 762 763 def stopUnknown(): 764 msg = ('asked to stop a component without avatar in mood %s' 765 % moods.get(mood)) 766 self.warning(msg) 767 return defer.fail(errors.ComponentMoodError(msg)) 768 769 mood = componentState.get('mood') 770 stoppers = {moods.sad.value: stopSad, 771 moods.lost.value: stopLost} 772 return stoppers.get(mood, stopUnknown)() 773
774 - def _componentStopWithAvatar(self, componentState, componentAvatar):
775 def cleanupAndDisconnectComponent(result): 776 return componentAvatar.disconnect()
777 778 def setSleeping(result): 779 if componentState.get('mood') == moods.sad.value: 780 self.debug('clearing sad mood after having stopped component') 781 componentState.setMood(moods.sleeping.value) 782 783 return result 784 785 d = componentAvatar.stop() 786 d.addCallback(cleanupAndDisconnectComponent) 787 d.addCallback(setSleeping) 788 789 return d 790
791 - def componentStop(self, componentState):
792 """ 793 Stop the given component. 794 If the component was sad, we clear its sad state as well, 795 since the stop was explicitly requested by the admin. 796 797 @type componentState: L{planet.ManagerComponentState} 798 799 @rtype: L{twisted.internet.defer.Deferred} 800 """ 801 self.debug('componentStop(%r)', componentState) 802 # We permit stopping a component even if it has a pending mood of 803 # happy, so that if it never gets to happy, we can still stop it. 804 if (componentState.get('moodPending') != None and 805 componentState.get('moodPending') != moods.happy.value): 806 self.debug("Pending mood is %r", componentState.get('moodPending')) 807 808 raise errors.BusyComponentError(componentState) 809 810 m = self.getComponentMapper(componentState) 811 if not m: 812 # We have a stale componentState for an already-deleted 813 # component 814 self.warning("Component mapper for component state %r doesn't " 815 "exist", componentState) 816 raise errors.UnknownComponentError(componentState) 817 elif not m.avatar: 818 return self._componentStopNoAvatar(componentState, m.id) 819 else: 820 return self._componentStopWithAvatar(componentState, m.avatar)
821
822 - def componentAddMessage(self, avatarId, message):
823 """ 824 Set the given message on the given component's state. 825 Can be called e.g. by a worker to report on a crashed component. 826 Sets the mood to sad if it is an error message. 827 """ 828 if not avatarId in self._componentMappers: 829 self.warning('asked to set a message on non-mapped component %s' % 830 avatarId) 831 return 832 833 m = self._componentMappers[avatarId] 834 m.state.append('messages', message) 835 if message.level == messages.ERROR: 836 self.debug('Error message makes component sad') 837 m.state.setMood(moods.sad.value)
838 839 # FIXME: unify naming of stuff like this
840 - def workerAttached(self, workerAvatar):
841 # called when a worker logs in 842 workerId = workerAvatar.avatarId 843 self.debug('vishnu.workerAttached(): id %s' % workerId) 844 845 # Create all components assigned to this worker. Note that the 846 # order of creation is unimportant, it's only the order of 847 # starting that matters (and that's different code). 848 components = [c for c in self._getComponentsToCreate() 849 if c.get('workerRequested') in (workerId, None)] 850 # So now, check what components worker is running 851 # so we can remove them from this components list 852 # also add components we have that are lost but not 853 # in list given by worker 854 d = workerAvatar.getComponents() 855 def workerAvatarComponentListReceived(workerComponents): 856 # list() is called to work around a pychecker bug. FIXME. 857 lostComponents = list([c for c in self.getComponentStates() 858 if c.get('workerRequested') == workerId and \ 859 c.get('mood') == moods.lost.value]) 860 for comp in workerComponents: 861 # comp is an avatarId string 862 # components is a list of {ManagerComponentState} 863 if comp in self._componentMappers: 864 compState = self._componentMappers[comp].state 865 if compState in components: 866 components.remove(compState) 867 if compState in lostComponents: 868 lostComponents.remove(compState) 869 870 for compState in lostComponents: 871 self.info( 872 "Restarting previously lost component %s on worker %s", 873 self._componentMappers[compState].id, workerId) 874 # We set mood to sleeping first. This allows things to 875 # distinguish between a newly-started component and a lost 876 # component logging back in. 877 compState.set('moodPending', None) 878 compState.setMood(moods.sleeping.value) 879 880 allComponents = components + lostComponents 881 882 if not allComponents: 883 self.debug( 884 "vishnu.workerAttached(): no components for this worker") 885 return 886 887 self._workerCreateComponents(workerId, allComponents)
888 d.addCallback(workerAvatarComponentListReceived) 889 890 reactor.callLater(0, self.componentHeaven.feedServerAvailable, 891 workerId) 892
893 - def _workerCreateComponents(self, workerId, components):
894 """ 895 Create the list of components on the given worker, sequentially, but 896 in no specific order. 897 898 @param workerId: avatarId of the worker 899 @type workerId: string 900 @param components: components to start 901 @type components: list of 902 L{flumotion.common.planet.ManagerComponentState} 903 """ 904 self.debug("_workerCreateComponents: workerId %r, components %r" % ( 905 workerId, components)) 906 907 if not workerId in self.workerHeaven.avatars: 908 self.debug('worker %s not logged in yet, delaying ' 909 'component start' % workerId) 910 return defer.succeed(None) 911 912 workerAvatar = self.workerHeaven.avatars[workerId] 913 914 d = defer.Deferred() 915 916 for c in components: 917 componentType = c.get('type') 918 conf = c.get('config') 919 self.debug('scheduling create of %s on %s' 920 % (conf['avatarId'], workerId)) 921 d.addCallback(self._workerCreateComponentDelayed, 922 workerAvatar, c, componentType, conf) 923 924 d.addCallback(lambda result: self.debug( 925 '_workerCreateComponents(): completed setting up create chain')) 926 927 # now trigger the chain 928 self.debug('_workerCreateComponents(): triggering create chain') 929 d.callback(None) 930 #reactor.callLater(0, d.callback, None) 931 return d
932
933 - def _workerCreateComponentDelayed(self, result, workerAvatar, 934 componentState, componentType, conf):
935 936 avatarId = conf['avatarId'] 937 nice = conf.get('nice', 0) 938 939 # we set the moodPending to HAPPY, so this component only gets 940 # asked to start once 941 componentState.set('moodPending', moods.happy.value) 942 943 d = workerAvatar.createComponent(avatarId, componentType, nice, 944 conf) 945 # FIXME: here we get the avatar Id of the component we wanted 946 # started, so now attach it to the planetState's component state 947 d.addCallback(self._createCallback, componentState) 948 d.addErrback(self._createErrback, componentState)
949 950 # FIXME: shouldn't we return d here to make sure components 951 # wait on each other to be started ? 952
953 - def _createCallback(self, result, componentState):
954 self.debug('got avatarId %s for state %s' % (result, componentState)) 955 m = self._componentMappers[componentState] 956 assert result == m.id, "received id %s is not the expected id %s" % ( 957 result, m.id)
958
959 - def _createErrback(self, failure, state):
960 # FIXME: make ConfigError copyable so we can .check() it here 961 # and print a nicer warning 962 self.warning('failed to create component %s: %s', 963 state.get('name'), log.getFailureMessage(failure)) 964 965 if failure.check(errors.ComponentAlreadyRunningError): 966 if self._componentMappers[state].jobState: 967 self.info('component appears to have logged in in the ' 968 'meantime') 969 else: 970 self.info('component appears to be running already; ' 971 'treating it as lost until it logs in') 972 state.setMood(moods.lost.value) 973 else: 974 message = messages.Error(T_( 975 N_("The component could not be started.")), 976 debug=log.getFailureMessage(failure)) 977 978 state.setMood(moods.sad.value) 979 state.append('messages', message) 980 981 return None
982
983 - def workerDetached(self, workerAvatar):
984 # called when a worker logs out 985 workerId = workerAvatar.avatarId 986 self.debug('vishnu.workerDetached(): id %s' % workerId)
987
988 - def addComponentToFlow(self, componentState, flowName):
989 # check if we have this flow yet and add if not 990 if flowName == 'atmosphere': 991 # treat the atmosphere like a flow, although it's not 992 flow = self.state.get('atmosphere') 993 else: 994 flow = _first(self.state.get('flows'), 995 lambda x: x.get('name') == flowName) 996 if not flow: 997 self.info('Creating flow "%s"' % flowName) 998 flow = planet.ManagerFlowState() 999 flow.set('name', flowName) 1000 flow.set('parent', self.state) 1001 self.state.append('flows', flow) 1002 1003 componentState.set('parent', flow) 1004 flow.append('components', componentState)
1005
1006 - def registerComponent(self, componentAvatar):
1007 # fetch or create a new mapper 1008 m = (self.getComponentMapper(componentAvatar.avatarId) 1009 or ComponentMapper()) 1010 1011 m.state = componentAvatar.componentState 1012 m.jobState = componentAvatar.jobState 1013 m.id = componentAvatar.avatarId 1014 m.avatar = componentAvatar 1015 1016 self._componentMappers[m.state] = m 1017 self._componentMappers[m.jobState] = m 1018 self._componentMappers[m.id] = m 1019 self._componentMappers[m.avatar] = m
1020
1021 - def unregisterComponent(self, componentAvatar):
1022 # called when the component is logging out 1023 # clear up jobState and avatar 1024 self.debug('unregisterComponent(%r): cleaning up state' % 1025 componentAvatar) 1026 1027 if componentAvatar not in self._componentMappers: 1028 self.warning("Component logging out that was incompletely logged " 1029 " in, ignoring") 1030 return 1031 1032 m = self._componentMappers[componentAvatar] 1033 1034 # unmap jobstate 1035 try: 1036 del self._componentMappers[m.jobState] 1037 except KeyError: 1038 self.warning('Could not remove jobState for %r' % componentAvatar) 1039 m.jobState = None 1040 1041 m.state.set('pid', None) 1042 m.state.set('workerName', None) 1043 m.state.set('moodPending', None) 1044 1045 # unmap avatar 1046 del self._componentMappers[m.avatar] 1047 m.avatar = None
1048
1049 - def getComponentStates(self):
1050 cList = self.state.getComponents() 1051 self.debug('getComponentStates(): %d components' % len(cList)) 1052 for c in cList: 1053 self.log(repr(c)) 1054 mood = c.get('mood') 1055 if mood == None: 1056 self.warning('%s has mood None' % c.get('name')) 1057 1058 return cList
1059
1060 - def deleteComponent(self, componentState):
1061 """ 1062 Empty the planet of the given component. 1063 1064 @returns: a deferred that will fire when all listeners have been 1065 notified of the removal of the component. 1066 """ 1067 self.debug('deleting component %r from state', componentState) 1068 c = componentState 1069 if c not in self._componentMappers: 1070 raise errors.UnknownComponentError(c) 1071 1072 flow = componentState.get('parent') 1073 if (c.get('moodPending') != None 1074 or c.get('mood') is not moods.sleeping.value): 1075 raise errors.BusyComponentError(c) 1076 1077 del self._componentMappers[self._componentMappers[c].id] 1078 del self._componentMappers[c] 1079 return flow.remove('components', c)
1080
1081 - def deleteFlow(self, flowName):
1082 """ 1083 Empty the planet of a flow. 1084 1085 @returns: a deferred that will fire when the flow is removed. 1086 """ 1087 1088 # first get all components to sleep 1089 flow = _find(self.state.get('flows'), flowName, lambda x: x.get('name')) 1090 components = flow.get('components') 1091 1092 # if any component is already in a mood change/command, fail 1093 isBusy = lambda c: c.get('moodPending') != None 1094 isNotSleeping = lambda c: c.get('mood') is not moods.sleeping.value 1095 pred = _fint(isBusy, isNotSleeping) 1096 if _any(components, pred): 1097 raise errors.BusyComponentError(_first(components, pred)) 1098 1099 for c in components: 1100 del self._componentMappers[self._componentMappers[c].id] 1101 del self._componentMappers[c] 1102 d = flow.empty() 1103 d.addCallback(lambda _: self.state.remove('flows', flow)) 1104 return d
1105
1106 - def emptyPlanet(self):
1107 """ 1108 Empty the planet of all components, and flows. Also clears all 1109 messages. 1110 1111 @returns: a deferred that will fire when the planet is empty. 1112 """ 1113 for mid in self.state.get('messages').keys(): 1114 self.clearMessage(mid) 1115 1116 # first get all components to sleep 1117 components = self.getComponentStates() 1118 1119 # if any component is already in a mood change/command, fail 1120 isPending = lambda c: c.get('moodPending') != None 1121 components = filter(isPending, components) 1122 if len(components) > 0: 1123 state = components[0] 1124 raise errors.BusyComponentError(state, 1125 "moodPending is %s" % moods.get(state.get('moodPending'))) 1126 1127 # filter out the ones that aren't sleeping and stop them 1128 components = self.getComponentStates() 1129 isNotSleeping = lambda c: c.get('mood') is not moods.sleeping.value 1130 components = filter(isNotSleeping, components) 1131 1132 # create a big deferred for stopping everything 1133 d = defer.Deferred() 1134 1135 self.debug('need to stop %d components: %r' % ( 1136 len(components), components)) 1137 1138 # FIXME: we should shut components down in the correct order (according 1139 # to the dependency graph); this uses an undefined ordering. 1140 for c in components: 1141 avatar = self._componentMappers[c].avatar 1142 # If this has logged out, but isn't sleeping (so is sad or lost), 1143 # we won't have an avatar. So, stop if it we can. 1144 if avatar: 1145 d.addCallback(lambda result, a: a.stop(), avatar) 1146 else: 1147 assert (c.get('mood') is moods.sad.value or 1148 c.get('mood') is moods.lost.value) 1149 1150 d.addCallback(self._emptyPlanetCallback) 1151 1152 # trigger the deferred after returning 1153 reactor.callLater(0, d.callback, None) 1154 1155 return d
1156
1157 - def _emptyPlanetCallback(self, result):
1158 # gets called after all components have stopped 1159 # cleans up the rest of the planet state 1160 components = self.getComponentStates() 1161 self.debug('_emptyPlanetCallback: need to delete %d components' % 1162 len(components)) 1163 1164 for c in components: 1165 if c.get('mood') is not moods.sleeping.value: 1166 self.warning('Component %s is not sleeping', c.get('name')) 1167 # clear mapper; remove componentstate and id 1168 m = self._componentMappers[c] 1169 del self._componentMappers[m.id] 1170 del self._componentMappers[c] 1171 1172 # if anything's left, we have a mistake somewhere 1173 l = self._componentMappers.keys() 1174 if len(l) > 0: 1175 self.warning('mappers still has keys %r' % (repr(l))) 1176 1177 dList = [] 1178 1179 dList.append(self.state.get('atmosphere').empty()) 1180 1181 for f in self.state.get('flows'): 1182 self.debug('appending deferred for emptying flow %r' % f) 1183 dList.append(f.empty()) 1184 self.debug('appending deferred for removing flow %r' % f) 1185 dList.append(self.state.remove('flows', f)) 1186 self.debug('appended deferreds') 1187 1188 dl = defer.DeferredList(dList) 1189 return dl
1190
1191 - def _getComponentsToCreate(self):
1192 """ 1193 @rtype: list of L{flumotion.common.planet.ManagerComponentState} 1194 """ 1195 # return a list of components that are sleeping 1196 components = self.state.getComponents() 1197 1198 # filter the ones that are sleeping 1199 # NOTE: now sleeping indicates that there is no existing job 1200 # as when jobs are created, mood becomes waking, so no need to 1201 # filter on moodPending 1202 isSleeping = lambda c: c.get('mood') == moods.sleeping.value 1203 components = filter(isSleeping, components) 1204 return components
1205
1206 - def _getWorker(self, workerName):
1207 # returns the WorkerAvatar with the given name 1208 if not workerName in self.workerHeaven.avatars: 1209 raise errors.ComponentNoWorkerError("Worker %s not logged in?" 1210 % workerName) 1211 1212 return self.workerHeaven.avatars[workerName]
1213
1214 - def getWorkerFeedServerPort(self, workerName):
1215 if workerName in self.workerHeaven.avatars: 1216 return self._getWorker(workerName).feedServerPort 1217 return None
1218
1219 - def reservePortsOnWorker(self, workerName, numPorts):
1220 """ 1221 Requests a number of ports on the worker named workerName. The 1222 ports will be reserved for the use of the caller until 1223 releasePortsOnWorker is called. 1224 1225 @returns: a list of ports as integers 1226 """ 1227 return self._getWorker(workerName).reservePorts(numPorts)
1228
1229 - def releasePortsOnWorker(self, workerName, ports):
1230 """ 1231 Tells the manager that the given ports are no longer being used, 1232 and may be returned to the allocation pool. 1233 """ 1234 try: 1235 return self._getWorker(workerName).releasePorts(ports) 1236 except errors.ComponentNoWorkerError, e: 1237 self.warning('could not release ports: %r' % e.args)
1238
1239 - def getComponentMapper(self, object):
1240 """ 1241 Look up an object mapper given the object. 1242 1243 @rtype: L{ComponentMapper} or None 1244 """ 1245 if object in self._componentMappers.keys(): 1246 return self._componentMappers[object] 1247 1248 return None
1249
1250 - def getManagerComponentState(self, object):
1251 """ 1252 Look up an object mapper given the object. 1253 1254 @rtype: L{ComponentMapper} or None 1255 """ 1256 if object in self._componentMappers.keys(): 1257 return self._componentMappers[object].state 1258 1259 return None
1260