Package flumotion :: Package manager :: Module component
[hide private]

Source Code for Module flumotion.manager.component

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_manager -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  manager-side objects for components 
 24   
 25  API Stability: semi-stable 
 26  """ 
 27   
 28  import time 
 29   
 30  from twisted.spread import pb 
 31  from twisted.internet import reactor, defer 
 32  from twisted.internet import error as terror 
 33  from twisted.python.failure import Failure 
 34  from zope.interface import implements 
 35   
 36  from flumotion.configure import configure 
 37  from flumotion.manager import base 
 38  from flumotion.common import errors, interfaces, keycards, log, config, planet 
 39  from flumotion.common import messages, common 
 40  from flumotion.twisted import flavors 
 41  from flumotion.common.planet import moods 
 42   
 43  from flumotion.common.messages import N_ 
 44  T_ = messages.gettexter('flumotion') 
 45   
46 -class ComponentAvatar(base.ManagerAvatar):
47 """ 48 I am a Manager-side avatar for a component. 49 I live in the L{ComponentHeaven}. 50 51 Each component that logs in to the manager gets an avatar created for it 52 in the manager. 53 54 @cvar avatarId: the L{componentId<common.componentId>} 55 @type avatarId: str 56 @cvar jobState: job state of this avatar's component 57 @type jobState: L{flumotion.common.planet.ManagerJobState} 58 @cvar componentState: component state of this avatar's component 59 @type componentState: L{flumotion.common.planet.ManagerComponentState} 60 """ 61 62 logCategory = 'comp-avatar' 63
64 - def __init__(self, heaven, avatarId, remoteIdentity, mind, conf, 65 jobState, clocking):
66 # doc in base class 67 base.ManagerAvatar.__init__(self, heaven, avatarId, 68 remoteIdentity, mind) 69 70 self.jobState = jobState 71 self.makeComponentState(conf) 72 self.clocking = clocking 73 74 self._shutdown_requested = False 75 76 self.vishnu.registerComponent(self) 77 # calllater to allow the component a chance to receive its 78 # avatar, so that it has set medium.remote 79 reactor.callLater(0, self.heaven.componentAttached, self)
80 81 ### python methods
82 - def __repr__(self):
83 mood = '(unknown)' 84 if self.componentState: 85 moodValue = self.componentState.get('mood') 86 if moodValue is not None: 87 mood = moods.get(moodValue).name 88 return '<%s %s (mood %s)>' % (self.__class__.__name__, 89 self.avatarId, mood)
90 91 ### ComponentAvatar methods
92 - def makeAvatarInitArgs(klass, heaven, avatarId, remoteIdentity, 93 mind):
94 def gotStates(result): 95 (_s1, conf), (_s2, jobState), (_s3, clocking) = result 96 assert _s1 and _s2 and _s3 # fireOnErrback=1 97 log.debug('component-avatar', 'got state information') 98 return (heaven, avatarId, remoteIdentity, mind, 99 conf, jobState, clocking)
100 log.debug('component-avatar', 'calling mind for state information') 101 d = defer.DeferredList([mind.callRemote('getConfig'), 102 mind.callRemote('getState'), 103 mind.callRemote('getMasterClockInfo')], 104 fireOnOneErrback=True) 105 d.addCallback(gotStates) 106 return d
107 makeAvatarInitArgs = classmethod(makeAvatarInitArgs) 108
109 - def onShutdown(self):
110 # doc in base class 111 self.info('component "%s" logged out', self.avatarId) 112 113 self.vishnu.unregisterComponent(self) 114 115 if self.clocking: 116 ip, port, base_time = self.clocking 117 self.vishnu.releasePortsOnWorker(self.getWorkerName(), 118 [port]) 119 120 self.componentState.clearJobState(self._shutdown_requested) 121 122 # FIXME: why? 123 self.componentState.set('moodPending', None) 124 125 self.componentState = None 126 self.jobState = None 127 128 self.heaven.componentDetached(self) 129 130 base.ManagerAvatar.onShutdown(self)
131 132 # my methods
133 - def addMessage(self, level, id, format, *args, **kwargs):
134 """ 135 Convenience message to construct a message and add it to the 136 component state. `format' should be marked as translatable in 137 the source with N_, and *args will be stored as format 138 arguments. Keyword arguments are passed on to the message 139 constructor. See L{flumotion.common.messages.Message} for the 140 meanings of the rest of the arguments. 141 142 For example:: 143 144 self.addMessage(messages.WARNING, 'foo-warning', 145 N_('The answer is %d'), 42, debug='not really') 146 """ 147 self.addMessageObject(messages.Message(level, 148 T_(format, *args), 149 id=id, **kwargs))
150
151 - def addMessageObject(self, message):
152 """ 153 Add a message to the planet state. 154 155 @type message: L{flumotion.common.messages.Message} 156 """ 157 self.componentState.append('messages', message)
158
159 - def upgradeConfig(self, state, conf):
160 # different from conf['version'], eh... 161 version = conf.get('config-version', 0) 162 while version < config.CURRENT_VERSION: 163 try: 164 config.UPGRADERS[version](conf) 165 version += 1 166 conf['config-version'] = version 167 except Exception, e: 168 self.addMessage(messages.WARNING, 169 'upgrade-%d' % version, 170 N_("Failed to upgrade config %r " 171 "from version %d. Please file " 172 "a bug."), conf, version, 173 debug=log.getExceptionMessage(e)) 174 return
175
176 - def makeComponentState(self, conf):
177 # the component just logged in with good credentials. we fetched 178 # its config and job state. now there are two possibilities: 179 # (1) we were waiting for such a component to start. There was 180 # a ManagerComponentState and an avatarId in the 181 # componentMappers waiting for us. 182 # (2) we don't know anything about this component, but it has a 183 # state and config. We deal with it, creating all the 184 # neccesary internal state. 185 def verifyExistingComponentState(conf, state): 186 # condition (1) 187 state.setJobState(self.jobState) 188 self.componentState = state 189 190 self.upgradeConfig(state, conf) 191 if state.get('config') != conf: 192 diff = config.dictDiff(state.get('config'), conf) 193 diffMsg = config.dictDiffMessageString(diff, 194 'internal conf', 195 'running conf') 196 self.addMessage(messages.WARNING, 'stale-config', 197 N_("Component logged in with stale " 198 "configuration. Consider stopping " 199 "this component and restarting " 200 "the manager."), 201 debug=("Updating internal conf from " 202 "running conf:\n" + diffMsg)) 203 self.warning('updating internal component state for %r', 204 state) 205 self.debug('changes to conf: %s', 206 config.dictDiffMessageString(diff)) 207 state.set('config', conf)
208 209 def makeNewComponentState(conf): 210 # condition (2) 211 state = planet.ManagerComponentState() 212 state.setJobState(self.jobState) 213 self.componentState = state 214 215 self.upgradeConfig(state, conf) 216 217 flowName, compName = conf['parent'], conf['name'] 218 219 state.set('name', compName) 220 state.set('type', conf['type']) 221 state.set('workerRequested', self.jobState.get('workerName')) 222 state.set('config', conf) 223 self.vishnu.addComponentToFlow(state, flowName) 224 return state 225 226 mState = self.vishnu.getManagerComponentState(self.avatarId) 227 if mState: 228 verifyExistingComponentState(conf, mState) 229 else: 230 makeNewComponentState(conf) 231
232 - def provideMasterClock(self):
233 """ 234 Tell the component to provide a master clock. 235 236 @rtype: L{twisted.internet.defer.Deferred} 237 """ 238 def success(clocking): 239 self.clocking = clocking 240 self.heaven.masterClockAvailable(self.avatarId, clocking)
241 242 def error(failure): 243 self.addMessage(messages.WARNING, 'provide-master-clock', 244 N_('Failed to provide the master clock'), 245 debug=log.getFailureMessage(failure)) 246 self.vishnu.releasePortsOnWorker(self.getWorkerName(), [port]) 247 248 if self.clocking: 249 self.heaven.masterClockAvailable(self.avatarId, self.clocking) 250 else: 251 (port,) = self.vishnu.reservePortsOnWorker(self.getWorkerName(), 1) 252 self.debug('provideMasterClock on port %d', port) 253 254 d = self.mindCallRemote('provideMasterClock', port) 255 d.addCallbacks(success, error) 256
257 - def getFeedServerPort(self):
258 """ 259 Returns the port on which a feed server for this component is 260 listening on. 261 262 @rtype: int 263 """ 264 return self.vishnu.getWorkerFeedServerPort(self.getWorkerName())
265
266 - def getRemoteManagerIP(self):
267 """ 268 Get the IP address of the manager as seen by the component. 269 270 @rtype: str 271 """ 272 return self.jobState.get('manager-ip')
273
274 - def getWorkerName(self):
275 """ 276 Return the name of the worker. 277 278 @rtype: str 279 """ 280 return self.jobState.get('workerName')
281
282 - def getPid(self):
283 """ 284 Return the PID of the component. 285 286 @rtype: int 287 """ 288 return self.jobState.get('pid')
289
290 - def getName(self):
291 """ 292 Get the name of the component. 293 294 @rtype: str 295 """ 296 return self.componentState.get('name')
297
298 - def getParentName(self):
299 """ 300 Get the name of the component's parent. 301 302 @rtype: str 303 """ 304 return self.componentState.get('parent').get('name')
305
306 - def getType(self):
307 """ 308 Get the component type name of the component. 309 310 @rtype: str 311 """ 312 return self.componentState.get('type')
313
314 - def getEaters(self):
315 """ 316 Get the set of eaters that this component eats from. 317 318 @rtype: dict of eaterName -> [(feedId, eaterAlias)] 319 """ 320 return self.componentState.get('config').get('eater', {})
321
322 - def getFeeders(self):
323 """ 324 Get the list of feeders that this component provides. 325 326 @rtype: list of feederName 327 """ 328 return self.componentState.get('config').get('feed', [])
329
330 - def getFeedId(self, feedName):
331 """ 332 Get the feedId of a feed provided or consumed by this component. 333 334 @param feedName: The name of the feed (i.e., eater alias or 335 feeder name) 336 @rtype: L{flumotion.common.common.feedId} 337 """ 338 return common.feedId(self.getName(), feedName)
339
340 - def getFullFeedId(self, feedName):
341 """ 342 Get the full feedId of a feed provided or consumed by this 343 component. 344 345 @param feedName: The name of the feed (i.e., eater alias or 346 feeder name) 347 @rtype: L{flumotion.common.common.fullFeedId} 348 """ 349 return common.fullFeedId(self.getParentName(), self.getName(), feedName)
350
351 - def getVirtualFeeds(self):
352 """ 353 Get the set of virtual feeds provided by this component. 354 355 @rtype: dict of fullFeedId -> (ComponentAvatar, feederName) 356 """ 357 conf = self.componentState.get('config') 358 ret = {} 359 for feedId, feederName in conf.get('virtual-feeds', {}).items(): 360 vComp, vFeed = common.parseFeedId(feedId) 361 ffid = common.fullFeedId(self.getParentName(), vComp, vFeed) 362 ret[ffid] = (self, feederName) 363 return ret
364
365 - def getWorker(self):
366 """ 367 Get the worker that this component should run on. 368 369 @rtype: str 370 """ 371 return self.componentState.get('workerRequested')
372
373 - def getClockMaster(self):
374 """ 375 Get this component's clock master, if any. 376 377 @rtype: avatarId or None 378 """ 379 return self.componentState.get('config')['clock-master']
380
381 - def stop(self):
382 """ 383 Tell the remote component to shut down. 384 """ 385 return self.mindCallRemote('stop')
386
387 - def setClocking(self, host, port, base_time):
388 # setMood on error? 389 return self.mindCallRemote('setMasterClock', host, port, base_time)
390
391 - def eatFrom(self, eaterAlias, fullFeedId, host, port):
392 self.debug('connecting eater %s to feed %s', eaterAlias, fullFeedId) 393 return self.mindCallRemote('eatFrom', eaterAlias, fullFeedId, 394 host, port)
395
396 - def feedTo(self, feederName, fullFeedId, host, port):
397 self.debug('connecting feeder %s to feed %s', feederName, fullFeedId) 398 return self.mindCallRemote('feedTo', feederName, fullFeedId, 399 host, port)
400 401 # FIXME: maybe make a BouncerComponentAvatar subclass ?
402 - def authenticate(self, keycard):
403 """ 404 Authenticate the given keycard. 405 Gets proxied to L{flumotion.component.bouncers.bouncer.""" \ 406 """BouncerMedium.remote_authenticate} 407 The component should be a subclass of 408 L{flumotion.component.bouncers.bouncer.Bouncer} 409 410 @type keycard: L{flumotion.common.keycards.Keycard} 411 """ 412 return self.mindCallRemote('authenticate', keycard)
413
414 - def removeKeycardId(self, keycardId):
415 """ 416 Remove a keycard managed by this bouncer because the requester 417 has gone. 418 419 @type keycardId: str 420 """ 421 return self.mindCallRemote('removeKeycardId', keycardId)
422
423 - def expireKeycard(self, keycardId):
424 """ 425 Expire a keycard issued to this component because the bouncer decided 426 to. 427 428 @type keycardId: str 429 """ 430 return self.mindCallRemote('expireKeycard', keycardId)
431 432 ### IPerspective methods, called by the worker's component
433 - def perspective_cleanShutdown(self):
434 """ 435 Called by a component to tell the manager that it's shutting down 436 cleanly (and thus should go to sleeping, rather than lost or sad) 437 """ 438 self.debug("shutdown is clean, shouldn't go to lost") 439 self._shutdown_requested = True
440
441 - def perspective_removeKeycardId(self, bouncerName, keycardId):
442 """ 443 Remove a keycard on the given bouncer on behalf of a component's medium. 444 445 This is requested by a component that created the keycard. 446 447 @type bouncerName: str 448 @param keycardId: id of keycard to remove 449 @type keycardId: str 450 """ 451 avatarId = common.componentId('atmosphere', bouncerName) 452 if not self.heaven.hasAvatar(avatarId): 453 self.warning('No bouncer with id %s registered', avatarId) 454 raise errors.UnknownComponentError(avatarId) 455 456 return self.heaven.getAvatar(avatarId).removeKeycardId(keycardId)
457
458 - def perspective_expireKeycard(self, requesterId, keycardId):
459 """ 460 Expire a keycard (and thus the requester's connection) 461 issued to the given requester. 462 463 This is called by the bouncer component that authenticated the keycard. 464 465 @param requesterId: name (avatarId) of the component that originally 466 requested authentication for the given keycardId 467 @type requesterId: str 468 @param keycardId: id of keycard to expire 469 @type keycardId: str 470 """ 471 # FIXME: we should also be able to expire manager bouncer keycards 472 if not self.heaven.hasAvatar(requesterId): 473 self.warning('asked to expire keycard %s for requester %s, ' 474 'but no such component registered', 475 keycardId, requesterId) 476 raise errors.UnknownComponentError(requesterId) 477 478 return self.heaven.getAvatar(requesterId).expireKeycard(keycardId)
479
480 -class dictlist(dict):
481 - def add(self, key, value):
482 if key not in self: 483 self[key] = [] 484 self[key].append(value)
485
486 - def remove(self, key, value):
487 self[key].remove(value) 488 if not self[key]: 489 del self[key]
490
491 -class FeedMap(object, log.Loggable):
492 logName = 'feed-map'
493 - def __init__(self):
494 self.avatars = {} 495 self._ordered_avatars = [] 496 self._dirty = True 497 self._recalc()
498
499 - def componentAttached(self, avatar):
500 assert avatar.avatarId not in self.avatars 501 self.avatars[avatar.avatarId] = avatar 502 self._ordered_avatars.append(avatar) 503 self._dirty = True
504
505 - def componentDetached(self, avatar):
506 # returns the a list of other components that will need to be 507 # reconnected 508 del self.avatars[avatar.avatarId] 509 self._ordered_avatars.remove(avatar) 510 self._dirty = True 511 # NB, feedDeps is dirty. Scrub it of avatars that have logged 512 # out 513 return [a for a in self.feedDeps.pop(avatar, []) 514 if a.avatarId in self.avatars]
515
516 - def getFeederAvatar(self, eater, feedId):
517 flowName = eater.getParentName() 518 compName, feedName = common.parseFeedId(feedId) 519 ffid = common.fullFeedId(flowName, compName, feedName) 520 feeder = None 521 if ffid in self.feeds: 522 feeder, feedName = self.feeds[ffid][0] 523 self.feedDeps.add(feeder, eater) 524 if feeder.getFeedId(feedName) != feedId: 525 self.debug('chose %s for feed %s', 526 feeder.getFeedId(feedName), feedId) 527 return feeder, feedName
528
529 - def _recalc(self):
530 if not self._dirty: 531 return 532 self.feedersForEaters = ffe = {} 533 self.eatersForFeeders = eff = dictlist() 534 self.feeds = dictlist() 535 self.feedDeps = dictlist() 536 537 for comp in self._ordered_avatars: 538 for feederName in comp.getFeeders(): 539 self.feeds.add(comp.getFullFeedId(feederName), 540 (comp, feederName)) 541 for ffid, pair in comp.getVirtualFeeds().items(): 542 self.feeds.add(ffid, pair) 543 544 for eater in self.avatars.values(): 545 for pairs in eater.getEaters().values(): 546 for feedId, eName in pairs: 547 feeder, fName = self.getFeederAvatar(eater, feedId) 548 if feeder: 549 ffe[eater.getFullFeedId(eName)] = (eName, feeder, fName) 550 eff.add(feeder.getFullFeedId(fName), 551 (fName, eater, eName)) 552 else: 553 self.debug('eater %s waiting for feed %s to log in', 554 eater.getFeedId(eName), feedId) 555 self._dirty = False
556
557 - def getFeedersForEaters(self, avatar):
558 """Get the set of feeds that this component is eating from, 559 keyed by eater alias. 560 561 @return: a list of (eaterAlias, feederAvatar, feedName) tuples 562 @rtype: list of (str, ComponentAvatar, str) 563 """ 564 self._recalc() 565 ret = [] 566 for tups in avatar.getEaters().values(): 567 for feedId, alias in tups: 568 ffid = avatar.getFullFeedId(alias) 569 if ffid in self.feedersForEaters: 570 ret.append(self.feedersForEaters[ffid]) 571 return ret
572
573 - def getEatersForFeeders(self, avatar):
574 """Get the set of eaters that this component feeds, keyed by 575 feeder name. 576 577 @return: a list of (feederName, eaterAvatar, eaterAlias) tuples 578 @rtype: list of (str, ComponentAvatar, str) 579 """ 580 self._recalc() 581 ret = [] 582 for feedName in avatar.getFeeders(): 583 ffid = avatar.getFullFeedId(feedName) 584 if ffid in self.eatersForFeeders: 585 ret.extend(self.eatersForFeeders[ffid]) 586 return ret
587
588 -class ComponentHeaven(base.ManagerHeaven):
589 """ 590 I handle all registered components and provide L{ComponentAvatar}s 591 for them. 592 """ 593 594 implements(interfaces.IHeaven) 595 avatarClass = ComponentAvatar 596 597 logCategory = 'comp-heaven' 598
599 - def __init__(self, vishnu):
600 # doc in base class 601 base.ManagerHeaven.__init__(self, vishnu) 602 self.feedMap = FeedMap()
603 604 ### our methods
605 - def feedServerAvailable(self, workerName):
606 self.debug('feed server %s logged in, we can connect to its port', 607 workerName) 608 # can be made more efficient 609 for avatar in self.avatars.values(): 610 if avatar.getWorkerName() == workerName: 611 self._setupClocking(avatar) 612 self._connectEatersAndFeeders(avatar)
613
614 - def masterClockAvailable(self, avatarId, clocking):
615 self.debug('master clock for %r provided on %r', avatarId, 616 clocking) 617 # can be made more efficient 618 for avatar in self.avatars.values(): 619 if avatar.avatarId != avatarId: 620 self._setupClocking(avatar)
621
622 - def _setupClocking(self, avatar):
623 master = avatar.getClockMaster() 624 if master: 625 if master == avatar.avatarId: 626 self.debug('Need for %r to provide a clock master', 627 master) 628 avatar.provideMasterClock() 629 else: 630 self.debug('Need to synchronize with clock master %r', 631 master) 632 # if master in self.avatars would be natural, but it seems 633 # that for now due to the getClocking() calls etc we need to 634 # check against the componentMapper set. could (and probably 635 # should) be fixed in the future. 636 m = self.vishnu.getComponentMapper(master) 637 if m and m.avatar: 638 clocking = m.avatar.clocking 639 if clocking: 640 host, port, base_time = clocking 641 avatar.setClocking(host, port, base_time) 642 else: 643 self.warning('%r should provide a clock master ' 644 'but is not doing so', master) 645 # should we componentAvatar.provideMasterClock() ? 646 else: 647 self.debug('clock master not logged in yet, will ' 648 'set clocking later')
649
650 - def componentAttached(self, avatar):
651 # No need to wait for any of this, they are not interdependent 652 assert avatar.avatarId in self.avatars 653 self.feedMap.componentAttached(avatar) 654 self._setupClocking(avatar) 655 self._connectEatersAndFeeders(avatar)
656
657 - def componentDetached(self, avatar):
658 assert avatar.avatarId not in self.avatars 659 compsNeedingReconnect = self.feedMap.componentDetached(avatar) 660 if self.vishnu.running: 661 for comp in compsNeedingReconnect: 662 self._connectEatersAndFeeders(comp)
663
664 - def mapNetFeed(self, fromAvatar, toAvatar):
665 toHost = toAvatar.getClientAddress() 666 toPort = toAvatar.getFeedServerPort() # can be None 667 668 # FIXME: until network map is implemented, hack to assume that 669 # connections from what appears to us to be the same IP go 670 # through localhost instead. Allows connections between 671 # components on a worker behind a firewall, but not between 672 # components running on different workers, both behind a 673 # firewall 674 fromHost = fromAvatar.mind.broker.transport.getPeer().host 675 if fromHost == toHost: 676 toHost = '127.0.0.1' 677 678 return toHost, toPort
679
680 - def _connectEatersAndFeeders(self, avatar):
681 def connect(fromComp, fromFeed, toComp, toFeed, method): 682 host, port = self.mapNetFeed(fromComp, toComp) 683 if port: 684 fullFeedId = toComp.getFullFeedId(toFeed) 685 proc = getattr(fromComp, method) 686 proc(fromFeed, fullFeedId, host, port) 687 else: 688 self.debug('postponing connection to %s: feed server ' 689 'unavailable', toComp.getFeedId(toFeed))
690 691 # FIXME: all connections are upstream for now 692 def always(otherComp): 693 return True
694 def never(otherComp): 695 return False 696 directions = [(self.feedMap.getFeedersForEaters, 697 always, 'eatFrom', 'feedTo'), 698 (self.feedMap.getEatersForFeeders, 699 never, 'feedTo', 'eatFrom')] 700 701 myComp = avatar 702 for getPeers, initiate, directMethod, reversedMethod in directions: 703 for myFeedName, otherComp, otherFeedName in getPeers(myComp): 704 if initiate(otherComp): 705 # we initiate the connection 706 connect(myComp, myFeedName, otherComp, otherFeedName, 707 directMethod) 708 else: 709 # make the other component initiate connection 710 connect(otherComp, otherFeedName, myComp, myFeedName, 711 reversedMethod) 712