Package flumotion :: Package manager :: Module worker
[hide private]

Source Code for Module flumotion.manager.worker

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  manager-side objects to handle worker clients 
 24  """ 
 25   
 26  from twisted.internet import defer 
 27   
 28  from flumotion.manager import base 
 29  from flumotion.common import errors, interfaces, log, registry 
 30  from flumotion.common import config, worker, common 
 31   
32 -class WorkerAvatar(base.ManagerAvatar):
33 """ 34 I am an avatar created for a worker. 35 A reference to me is given when logging in and requesting a worker avatar. 36 I live in the manager. 37 38 @ivar feedServerPort: TCP port the feed server is listening on 39 @type feedServerPort: int 40 """ 41 logCategory = 'worker-avatar' 42 43 _portSet = None 44 feedServerPort = None 45
46 - def __init__(self, heaven, avatarId, remoteIdentity, mind, 47 feedServerPort, ports, randomPorts):
48 base.ManagerAvatar.__init__(self, heaven, avatarId, 49 remoteIdentity, mind) 50 self.feedServerPort = feedServerPort 51 52 self._portSet = worker.PortSet(self.avatarId, ports, randomPorts) 53 54 self.heaven.workerAttached(self) 55 self.vishnu.workerAttached(self)
56
57 - def getName(self):
58 return self.avatarId
59
60 - def makeAvatarInitArgs(klass, heaven, avatarId, remoteIdentity, 61 mind):
62 def havePorts(res): 63 log.debug('worker-avatar', 'got port information') 64 (_s1, feedServerPort), (_s2, (ports, random)) = res 65 return (heaven, avatarId, remoteIdentity, mind, 66 feedServerPort, ports, random)
67 log.debug('worker-avatar', 'calling mind for port information') 68 d = defer.DeferredList([mind.callRemote('getFeedServerPort'), 69 mind.callRemote('getPorts')], 70 fireOnOneErrback=True) 71 d.addCallback(havePorts) 72 return d
73 makeAvatarInitArgs = classmethod(makeAvatarInitArgs) 74
75 - def onShutdown(self):
76 self.heaven.workerDetached(self) 77 self.vishnu.workerDetached(self) 78 base.ManagerAvatar.onShutdown(self)
79
80 - def reservePorts(self, numPorts):
81 """ 82 Reserve the given number of ports on the worker. 83 84 @param numPorts: how many ports to reserve 85 @type numPorts: int 86 """ 87 return self._portSet.reservePorts(numPorts)
88
89 - def releasePorts(self, ports):
90 """ 91 Release the given list of ports on the worker. 92 93 @param ports: list of ports to release 94 @type ports: list of int 95 """ 96 self._portSet.releasePorts(ports)
97
98 - def createComponent(self, avatarId, type, nice, conf):
99 """ 100 Create a component of the given type with the given nice level. 101 102 @param avatarId: avatarId the component should use to log in 103 @type avatarId: str 104 @param type: type of the component to create 105 @type type: str 106 @param nice: the nice level to create the component at 107 @type nice: int 108 @param conf: the component's config dict 109 @type conf: dict 110 111 @returns: a deferred that will give the avatarId the component 112 will use to log in to the manager 113 """ 114 self.debug('creating %s (%s) on worker %s with nice level %d', 115 avatarId, type, self.avatarId, nice) 116 defs = registry.getRegistry().getComponent(type) 117 try: 118 entry = defs.getEntryByType('component') 119 # FIXME: use entry.getModuleName() (doesn't work atm?) 120 moduleName = defs.getSource() 121 methodName = entry.getFunction() 122 except KeyError: 123 self.warning('no "component" entry in registry of type %s, %s', 124 type, 'falling back to createComponent') 125 moduleName = defs.getSource() 126 methodName = "createComponent" 127 128 self.debug('call remote create') 129 return self.mindCallRemote('create', avatarId, type, moduleName, 130 methodName, nice, conf)
131
132 - def getComponents(self):
133 """ 134 Get a list of components that the worker is running. 135 136 @returns: a deferred that will give the avatarIds running on the 137 worker 138 """ 139 self.debug('getting component list from worker %s' % 140 self.avatarId) 141 return self.mindCallRemote('getComponents')
142 143 ### IPerspective methods, called by the worker's component
144 - def perspective_componentAddMessage(self, avatarId, message):
145 """ 146 Called by the worker to tell the manager to add a given message to 147 the given component. 148 149 Useful in cases where the component can't report messages itself, 150 for example because it crashed. 151 152 @param avatarId: avatarId of the component the message is about 153 @type message: L{flumotion.common.messages.Message} 154 """ 155 self.debug('received message from component %s' % avatarId) 156 self.vishnu.componentAddMessage(avatarId, message)
157
158 -class WorkerHeaven(base.ManagerHeaven):
159 """ 160 I interface between the Manager and worker clients. 161 For each worker client I create an L{WorkerAvatar} to handle requests. 162 I live in the manager. 163 """ 164 165 logCategory = "workerheaven" 166 avatarClass = WorkerAvatar 167
168 - def __init__(self, vishnu):
171 172 ### my methods
173 - def workerAttached(self, workerAvatar):
174 """ 175 Notify the heaven that the given worker has logged in. 176 177 @type workerAvatar: L{WorkerAvatar} 178 """ 179 workerName = workerAvatar.getName() 180 if not workerName in self.state.get('names'): 181 # wheee 182 host = workerAvatar.mind.broker.transport.getPeer().host 183 state = worker.ManagerWorkerState(name=workerName, host=host) 184 self.state.append('names', workerName) 185 self.state.append('workers', state) 186 else: 187 self.warning('worker %s was already registered in the heaven', 188 workerName) 189 raise errors.AlreadyConnectedError()
190
191 - def workerDetached(self, workerAvatar):
192 """ 193 Notify the heaven that the given worker has logged out. 194 195 @type workerAvatar: L{WorkerAvatar} 196 """ 197 workerName = workerAvatar.getName() 198 try: 199 self.state.remove('names', workerName) 200 for state in list(self.state.get('workers')): 201 if state.get('name') == workerName: 202 self.state.remove('workers', state) 203 except ValueError: 204 self.warning('worker %s was never registered in the heaven', 205 workerName)
206