Package flumotion :: Package common :: Module medium
[hide private]

Source Code for Module flumotion.common.medium

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Contains the base class for PB client-side mediums. 
 24  """ 
 25   
 26  import time 
 27   
 28  from twisted.spread import pb 
 29  from twisted.internet import defer, reactor 
 30  from zope.interface import implements 
 31   
 32  from flumotion.common import log, interfaces, bundleclient, errors, common 
 33  from flumotion.common import messages 
 34  from flumotion.configure import configure 
 35  from flumotion.twisted import pb as fpb 
 36   
37 -class BaseMedium(fpb.Referenceable):
38 """ 39 I am a base interface for PB clients interfacing with PB server-side 40 avatars. 41 Used by admin/worker/component to talk to manager's vishnu, 42 and by job to talk to worker's brain. 43 44 @ivar remote: a remote reference to the server-side object on 45 which perspective_(methodName) methods can be called 46 @type remote: L{twisted.spread.pb.RemoteReference} 47 @type bundleLoader: L{flumotion.common.bundleclient.BundleLoader} 48 """ 49 50 # subclasses will need to set this to the specific medium type 51 # tho... 52 implements(interfaces.IMedium) 53 logCategory = "basemedium" 54 remoteLogName = "baseavatar" 55 56 remote = None 57 bundleLoader = None 58
59 - def setRemoteReference(self, remoteReference):
60 """ 61 Set the given remoteReference as the reference to the server-side 62 avatar. 63 64 @param remoteReference: L{twisted.spread.pb.RemoteReference} 65 """ 66 self.debug('%r.setRemoteReference: %r' % (self, remoteReference)) 67 self.remote = remoteReference 68 def nullRemote(x): 69 self.debug('%r: disconnected from %r' % (self, self.remote)) 70 self.remote = None
71 self.remote.notifyOnDisconnect(nullRemote) 72 73 self.bundleLoader = bundleclient.BundleLoader(self.callRemote) 74 75 # figure out connection addresses if it's an internet address 76 tarzan = None 77 jane = None 78 try: 79 transport = remoteReference.broker.transport 80 tarzan = transport.getHost() 81 jane = transport.getPeer() 82 except Exception, e: 83 self.debug("could not get connection info, reason %r" % e) 84 if tarzan and jane: 85 self.debug("connection is from me on %s to remote on %s" % ( 86 common.addressGetHost(tarzan), 87 common.addressGetHost(jane)))
88
89 - def hasRemoteReference(self):
90 """ 91 Does the medium have a remote reference to a server-side avatar ? 92 """ 93 return self.remote != None
94
95 - def callRemoteLogging(self, level, stackDepth, name, *args, **kwargs):
96 """ 97 Call the given method with the given arguments remotely on the 98 server-side avatar. 99 100 Gets serialized to server-side perspective_ methods. 101 102 @param level: the level we should log at (log.DEBUG, log.INFO, etc) 103 @type level: int 104 @param stackDepth: the number of stack frames to go back to get 105 file and line information, negative or zero. 106 @type stackDepth: non-positive int 107 @param name: name of the remote method 108 @type name: str 109 """ 110 if level is not None: 111 debugClass = str(self.__class__).split(".")[-1].upper() 112 startArgs = [self.remoteLogName, debugClass, name] 113 format, debugArgs = log.getFormatArgs( 114 '%s --> %s: callRemote(%s, ', startArgs, 115 ')', (), args, kwargs) 116 logKwArgs = self.doLog(level, stackDepth - 1, 117 format, *debugArgs) 118 119 if not self.remote: 120 self.warning('Tried to callRemote(%s), but we are disconnected' 121 % name) 122 return defer.fail(errors.NotConnectedError()) 123 124 def callback(result): 125 format, debugArgs = log.getFormatArgs( 126 '%s <-- %s: callRemote(%s, ', startArgs, 127 '): %s', (log.ellipsize(result), ), args, kwargs) 128 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 129 return result
130 131 def errback(failure): 132 format, debugArgs = log.getFormatArgs( 133 '%s <-- %s: callRemote(%s, ', startArgs, 134 '): %r', (failure, ), args, kwargs) 135 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 136 return failure 137 138 d = self.remote.callRemote(name, *args, **kwargs) 139 if level is not None: 140 d.addCallbacks(callback, errback) 141 return d 142
143 - def callRemote(self, name, *args, **kwargs):
144 """ 145 Call the given method with the given arguments remotely on the 146 server-side avatar. 147 148 Gets serialized to server-side perspective_ methods. 149 """ 150 return self.callRemoteLogging(log.DEBUG, -1, name, *args, 151 **kwargs)
152
153 - def getBundledFunction(self, module, function):
154 """ 155 Returns the given function in the given module, loading the 156 module from a bundle. 157 158 If we can't find the bundle for the given module, or if the 159 given module does not contain the requested function, we will 160 raise L{flumotion.common.errors.RemoteRunError} (perhaps a 161 poorly chosen error). If importing the module raises an 162 exception, that exception will be passed through unmodified. 163 164 @param module: module the function lives in 165 @type module: str 166 @param function: function to run 167 @type function: str 168 169 @returns: a callable, the given function in the given module. 170 """ 171 def gotModule(mod): 172 if hasattr(mod, function): 173 return getattr(mod, function) 174 else: 175 msg = 'No procedure named %s in module %s' % (function, 176 module) 177 self.warning('%s', msg) 178 raise errors.RemoteRunError(msg)
179 180 def gotModuleError(failure): 181 failure.trap(errors.NoBundleError) 182 msg = 'Failed to find bundle for module %s' % module 183 self.warning('%s', msg) 184 raise errors.RemoteRunError(msg) 185 186 d = self.bundleLoader.loadModule(module) 187 d.addCallbacks(gotModule, gotModuleError) 188 return d 189
190 - def runBundledFunction(self, module, function, *args, **kwargs):
191 """ 192 Runs the given function in the given module with the given 193 arguments. 194 195 This method calls getBundledFunction and then invokes the 196 function. Any error raised by getBundledFunction or by invoking 197 the function will be passed through unmodified. 198 199 Callers that expect to return their result over a PB connection 200 should catch nonserializable exceptions so as to prevent nasty 201 backtraces in the logs. 202 203 @param module: module the function lives in 204 @type module: str 205 @param function: function to run 206 @type function: str 207 208 @returns: the return value of the given function in the module. 209 """ 210 self.debug('runBundledFunction(%r, %r)', module, function) 211 def gotFunction(proc): 212 def invocationError(failure): 213 self.warning('Exception raised while calling ' 214 '%s.%s(*args=%r, **kwargs=%r): %s', 215 module, function, args, kwargs, 216 log.getFailureMessage(failure)) 217 return failure
218 219 self.debug('calling %s.%s(%r, %r)', module, function, args, 220 kwargs) 221 d = defer.maybeDeferred(proc, *args, **kwargs) 222 d.addErrback(invocationError) 223 return d 224 225 d = self.getBundledFunction(module, function) 226 d.addCallback(gotFunction) 227 return d 228
229 -class PingingMedium(BaseMedium):
230 _pingInterval = configure.heartbeatInterval 231 _pingCheckInterval = configure.heartbeatInterval * 2.5 232 _pingDC = None 233
234 - def startPinging(self, disconnect):
235 """ 236 @param disconnect: a method to call when we do not get ping replies 237 @type disconnect: callable 238 """ 239 self.debug('startPinging') 240 self._lastPingback = time.time() 241 if self._pingDC: 242 self.debug("Cannot start pinging, already pinging") 243 return 244 self._pingDisconnect = disconnect 245 self._ping() 246 self._pingCheck()
247
248 - def _ping(self):
249 def pingback(result): 250 self._lastPingback = time.time() 251 self.log('pinged, pingback at %r' % self._lastPingback)
252 253 if self.remote: 254 self.log('pinging') 255 d = self.callRemoteLogging(log.LOG, 0, 'ping') 256 d.addCallback(pingback) 257 else: 258 self.info('tried to ping, but disconnected yo') 259 260 self._pingDC = reactor.callLater(self._pingInterval, 261 self._ping)
262
263 - def _pingCheck(self):
264 self._pingCheckDC = None 265 if (self.remote and 266 (time.time() - self._lastPingback > self._pingCheckInterval)): 267 self.info('no pingback in %f seconds, closing connection', 268 self._pingCheckInterval) 269 self._pingDisconnect() 270 else: 271 self._pingCheckDC = reactor.callLater(self._pingCheckInterval, 272 self._pingCheck)
273 - def stopPinging(self):
274 if self._pingCheckDC: 275 self._pingCheckDC.cancel() 276 self._pingCheckDC = None 277 278 if self._pingDC: 279 self._pingDC.cancel() 280 self._pingDC = None
281
282 - def _disconnect(self):
283 if self.remote: 284 self.remote.broker.transport.loseConnection()
285
286 - def setRemoteReference(self, remote):
287 BaseMedium.setRemoteReference(self, remote) 288 def stopPingingCb(x): 289 self.debug('stop pinging') 290 self.stopPinging()
291 self.remote.notifyOnDisconnect(stopPingingCb) 292 293 self.startPinging(self._disconnect) 294