Package flumotion :: Package worker :: Module medium
[hide private]

Source Code for Module flumotion.worker.medium

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  worker-side objects to handle worker clients 
 24  """ 
 25   
 26  import signal 
 27   
 28  from twisted.internet import reactor, error 
 29  from twisted.spread import flavors 
 30  from zope.interface import implements 
 31   
 32  from flumotion.common import errors, interfaces, log 
 33  from flumotion.common import medium 
 34  from flumotion.twisted import pb as fpb 
 35   
 36  JOB_SHUTDOWN_TIMEOUT = 5 
 37   
 38  factoryClass = fpb.ReconnectingFPBClientFactory 
39 -class WorkerClientFactory(factoryClass):
40 """ 41 I am a client factory for the worker to log in to the manager. 42 """ 43 logCategory = 'worker' 44 perspectiveInterface = interfaces.IWorkerMedium 45
46 - def __init__(self, medium, host, port):
47 """ 48 @type medium: L{flumotion.worker.medium.WorkerMedium} 49 @type host: str 50 @type port: int 51 """ 52 self._managerHost = host 53 self._managerPort = port 54 self.medium = medium 55 # doing this as a class method triggers a doc error 56 factoryClass.__init__(self) 57 # maximum 10 second delay for workers to attempt to log in again 58 self.maxDelay = 10
59
60 - def clientConnectionFailed(self, connector, reason):
61 """ 62 @param reason: L{twisted.spread.pb.failure.Failure} 63 """ 64 # this method exists so that we log the failure 65 fpb.ReconnectingFPBClientFactory.clientConnectionFailed(self, 66 connector, reason) 67 # delay is now updated 68 self.debug("failed to connect, will try to reconnect in %f seconds" % self.delay)
69 70 ### ReconnectingPBClientFactory methods
71 - def gotDeferredLogin(self, d):
72 # the deferred from the login is now available 73 # add some of our own to it 74 def remoteDisconnected(remoteReference): 75 if reactor.killed: 76 self.log('Connection to manager lost due to shutdown') 77 else: 78 self.warning('Lost connection to manager, ' 79 'will attempt to reconnect')
80 81 def loginCallback(reference): 82 self.info("Logged in to manager") 83 self.debug("remote reference %r" % reference) 84 85 self.medium.setRemoteReference(reference) 86 reference.notifyOnDisconnect(remoteDisconnected)
87 88 def alreadyConnectedErrback(failure): 89 failure.trap(errors.AlreadyConnectedError) 90 self.warning('A worker with the name "%s" is already connected.' % 91 failure.value) 92 93 def accessDeniedErrback(failure): 94 failure.trap(errors.NotAuthenticatedError) 95 self.warning('Access denied.') 96 97 def connectionRefusedErrback(failure): 98 failure.trap(error.ConnectionRefusedError) 99 self.warning('Connection to %s:%d refused.' % (self._managerHost, 100 self._managerPort)) 101 102 def NoSuchMethodErrback(failure): 103 failure.trap(flavors.NoSuchMethod) 104 # failure.value is a str 105 if failure.value.find('remote_getKeycardClasses') > -1: 106 self.warning( 107 "Manager %s:%d is older than version 0.3.0. " 108 "Please upgrade." % (self._managerHost, self._managerPort)) 109 return 110 111 return failure 112 113 def loginFailedErrback(failure): 114 self.warning('Login failed, reason: %s' % str(failure)) 115 116 d.addCallback(loginCallback) 117 d.addErrback(accessDeniedErrback) 118 d.addErrback(connectionRefusedErrback) 119 d.addErrback(alreadyConnectedErrback) 120 d.addErrback(NoSuchMethodErrback) 121 d.addErrback(loginFailedErrback) 122
123 -class WorkerMedium(medium.PingingMedium):
124 """ 125 I am a medium interfacing with the manager-side WorkerAvatar. 126 127 @ivar brain: the worker brain 128 @type brain: L{WorkerBrain} 129 """ 130 131 logCategory = 'workermedium' 132 133 implements(interfaces.IWorkerMedium) 134
135 - def __init__(self, brain):
136 """ 137 @type brain: L{WorkerBrain} 138 """ 139 self.brain = brain 140 self.factory = None
141
142 - def startConnecting(self, connectionInfo):
143 info = connectionInfo 144 145 self.factory = WorkerClientFactory(self, info.host, info.port) 146 self.factory.startLogin(info.authenticator) 147 148 if info.use_ssl: 149 from flumotion.common import common 150 common.assertSSLAvailable() 151 from twisted.internet import ssl 152 reactor.connectSSL(info.host, info.port, self.factory, 153 ssl.ClientContextFactory()) 154 else: 155 reactor.connectTCP(info.host, info.port, self.factory)
156
157 - def stopConnecting(self):
158 # only called by test suites 159 self.factory.disconnect() 160 self.factory.stopTrying()
161 162 ### pb.Referenceable method for the manager's WorkerAvatar
163 - def remote_getPorts(self):
164 """ 165 Gets the set of TCP ports that this worker is configured to use. 166 167 @rtype: 2-tuple: (list of int, bool) 168 @return: list of ports, and a boolean if we allocate ports 169 randomly 170 """ 171 return self.brain.getPorts()
172
173 - def remote_getFeedServerPort(self):
174 """ 175 Return the TCP port the Feed Server is listening on. 176 177 @rtype: int, or NoneType 178 @return: TCP port number, or None if there is no feed server 179 """ 180 return self.brain.getFeedServerPort()
181
182 - def remote_create(self, avatarId, type, moduleName, methodName, 183 nice, conf):
184 """ 185 Start a component of the given type with the given nice level. 186 Will spawn a new job process to run the component in. 187 188 @param avatarId: avatar identification string 189 @type avatarId: str 190 @param type: type of the component to create 191 @type type: str 192 @param moduleName: name of the module to create the component from 193 @type moduleName: str 194 @param methodName: the factory method to use to create the component 195 @type methodName: str 196 @param nice: nice level 197 @type nice: int 198 @param conf: component config 199 @type conf: dict 200 201 @returns: a deferred fired when the process has started and created 202 the component 203 """ 204 return self.brain.create(avatarId, type, moduleName, methodName, 205 nice, conf)
206
207 - def remote_checkElements(self, elementNames):
208 """ 209 Checks if one or more GStreamer elements are present and can be 210 instantiated. 211 212 @param elementNames: names of the Gstreamer elements 213 @type elementNames: list of str 214 215 @rtype: list of str 216 @returns: a list of instantiatable element names 217 """ 218 return self.brain.runCheck('flumotion.worker.checks.check', 219 'checkElements', elementNames)
220
221 - def remote_checkImport(self, moduleName):
222 """ 223 Checks if the given module can be imported. 224 225 @param moduleName: name of the module to check 226 @type moduleName: str 227 228 @returns: None or Failure 229 """ 230 return self.brain.runCheck('flumotion.worker.checks.check', 'checkImport', 231 moduleName)
232
233 - def remote_runCheck(self, module, function, *args, **kwargs):
234 """ 235 Runs the given function in the given module with the given arguments. 236 237 @param module: module the function lives in 238 @type module: str 239 @param function: function to run 240 @type function: str 241 242 @returns: the return value of the given function in the module. 243 """ 244 return self.brain.runCheck(module, function, *args, **kwargs)
245 remote_runFunction = remote_runCheck 246
247 - def remote_getComponents(self):
248 """ 249 I return a list of componentAvatarIds, I have. I am called by the 250 manager soon after I attach to it. This is needed on reconnects 251 so that the manager knows what components it needs to start on me. 252 253 @returns: a list of componentAvatarIds 254 """ 255 return self.brain.getComponents()
256
257 - def remote_killJob(self, avatarId, signum=signal.SIGKILL):
258 """Kill one of the worker's jobs. 259 260 This method is intended for exceptional purposes only; a normal 261 component shutdown is performed by the manager via calling 262 remote_stop() on the component avatar. 263 264 Raises L{flumotion.common.errors.UnknownComponentError} if the 265 job is unknown. 266 267 @param avatarId: the avatar Id of the component, e.g. 268 '/default/audio-encoder' 269 @type avatarId: string 270 @param signum: Signal to send, optional. Defaults to SIGKILL. 271 @type signum: int 272 """ 273 self.brain.killJob(avatarId, signum)
274