Package flumotion :: Package component :: Module feed
[hide private]

Source Code for Module flumotion.component.feed

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_worker_feed -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  implementation of a PB Client to interface with feedserver.py 
 24  """ 
 25   
 26  import socket 
 27  import os 
 28   
 29  from twisted.internet import reactor, main, defer, tcp 
 30  from twisted.python import failure 
 31  from zope.interface import implements 
 32   
 33  from flumotion.common import log, common, interfaces 
 34  from flumotion.twisted import pb as fpb 
 35   
 36   
 37  # copied from fdserver.py so that it can be bundled 
38 -class _SocketMaybeCloser(tcp._SocketCloser):
39 keepSocketAlive = False 40
41 - def _closeSocket(self):
42 # We override this (from tcp._SocketCloser) so that we can close sockets 43 # properly in the normal case, but once we've passed our socket on via 44 # the FD-channel, we just close() it (not calling shutdown() which will 45 # close the TCP channel without closing the FD itself) 46 if self.keepSocketAlive: 47 try: 48 self.socket.close() 49 except socket.error: 50 pass 51 else: 52 tcp._SocketCloser._closeSocket(self)
53
54 -class PassableClientConnection(_SocketMaybeCloser, tcp.Client):
55 pass
56
57 -class PassableClientConnector(tcp.Connector):
58 # It is unfortunate, but it seems that either we override this 59 # private-ish method or reimplement BaseConnector.connect(). This is 60 # the path that tcp.py takes, so we take it too.
61 - def _makeTransport(self):
62 return PassableClientConnection(self.host, self.port, 63 self.bindAddress, self, 64 self.reactor)
65
66 -class FeedClientFactory(fpb.FPBClientFactory, log.Loggable):
67 """ 68 I am a client factory used by a feed component's medium to log into 69 a worker and exchange feeds. 70 """ 71 logCategory = 'feedclient' 72 perspectiveInterface = interfaces.IFeedMedium 73
74 - def __init__(self, medium):
75 fpb.FPBClientFactory.__init__(self) 76 self.medium = medium
77 78 # not a BaseMedium because we are going to do strange things to the transport
79 -class FeedMedium(fpb.Referenceable):
80 """ 81 I am a client for a Feed Server. 82 83 I am used as the remote interface between a component and another 84 component. 85 86 @ivar component: the component this is a feed client for 87 @type component: L{flumotion.component.feedcomponent.FeedComponent} 88 @ivar remote: a reference to a L{FeedAvatar} 89 @type remote: L{twisted.spread.pb.RemoteReference} 90 """ 91 logCategory = 'feedmedium' 92 remoteLogName = 'feedserver' 93 implements(interfaces.IFeedMedium) 94 95 remote = None 96
97 - def __init__(self, logName=None):
98 if logName: 99 assert isinstance(logName, str) 100 self.logName = logName 101 self._factory = None 102 self._feedToDeferred = defer.Deferred()
103
104 - def startConnecting(self, host, port, authenticator, timeout=30, 105 bindAddress=None):
106 """Optional helper method to connect to a remote feed server. 107 108 This method starts a client factory connecting via a 109 L{PassableClientConnector}. It offers the possibility of 110 cancelling an in-progress connection via the stopConnecting() 111 method. 112 113 @param host: the remote host name 114 @type host: str 115 @param port: the tcp port on which to connect 116 @param port int 117 @param authenticator: the authenticator, normally provided by 118 the worker 119 @param authenticator: L{flumotion.twisted.pb.Authenticator} 120 121 @returns: a deferred that will fire with the remote reference, 122 once we have authenticated. 123 """ 124 assert self._factory is None 125 self._factory = FeedClientFactory(self) 126 reactor.connectWith(PassableClientConnector, host, port, 127 self._factory, timeout, bindAddress) 128 return self._factory.login(authenticator)
129
130 - def requestFeed(self, host, port, authenticator, fullFeedId):
131 """Request a feed from a remote feed server. 132 133 This helper method calls startConnecting() to make the 134 connection and authenticate, and will return the feed file 135 descriptor or an error. A pending connection attempt can be 136 cancelled via stopConnecting(). 137 138 @param host: the remote host name 139 @type host: str 140 @param port: the tcp port on which to connect 141 @type port int 142 @param authenticator: the authenticator, normally provided by 143 the worker 144 @type authenticator: L{flumotion.twisted.pb.Authenticator} 145 @param fullFeedId: the full feed id (/flow/component:feed) 146 offered by the remote side 147 @type fullFeedId: str 148 149 @returns: a deferred that, if successful, will fire with a pair 150 (feedId, fd). In an error case it will errback and close the 151 remote connection. 152 """ 153 def connected(remote): 154 self.setRemoteReference(remote) 155 return remote.callRemote('sendFeed', fullFeedId)
156 157 def feedSent(res): 158 # res is None 159 # either just before or just after this, we received a 160 # sendFeedReply call from the feedserver. so now we're 161 # waiting on the component to get its fd 162 return self._feedToDeferred
163 164 def error(failure): 165 self.warning('failed to retrieve %s from %s:%d', fullFeedId, 166 host, port) 167 self.debug('failure: %s', log.getFailureMessage(failure)) 168 self.debug('closing connection') 169 self.stopConnecting() 170 return failure 171 172 d = self.startConnecting(host, port, authenticator) 173 d.addCallback(connected) 174 d.addCallback(feedSent) 175 d.addErrback(error) 176 return d 177
178 - def sendFeed(self, host, port, authenticator, fullFeedId):
179 """Send a feed to a remote feed server. 180 181 This helper method calls startConnecting() to make the 182 connection and authenticate, and will return the feed file 183 descriptor or an error. A pending connection attempt can be 184 cancelled via stopConnecting(). 185 186 @param host: the remote host name 187 @type host: str 188 @param port: the tcp port on which to connect 189 @type port int 190 @param authenticator: the authenticator, normally provided by 191 the worker 192 @type authenticator: L{flumotion.twisted.pb.Authenticator} 193 @param fullFeedId: the full feed id (/flow/component:eaterAlias) 194 to feed to on the remote size 195 @type fullFeedId: str 196 197 @returns: a deferred that, if successful, will fire with a pair 198 (feedId, fd). In an error case it will errback and close the 199 remote connection. 200 """ 201 def connected(remote): 202 assert isinstance(remote.broker.transport, _SocketMaybeCloser) 203 self.setRemoteReference(remote) 204 return remote.callRemote('receiveFeed', fullFeedId)
205 206 def feedSent(res): 207 t = self.remote.broker.transport 208 self.debug('stop reading from transport') 209 t.stopReading() 210 211 self.debug('flushing PB write queue') 212 t.doWrite() 213 self.debug('stop writing to transport') 214 t.stopWriting() 215 216 t.keepSocketAlive = True 217 fd = os.dup(t.fileno()) 218 219 # avoid refcount cycles 220 self.setRemoteReference(None) 221 222 d = defer.Deferred() 223 def loseConnection(): 224 t.connectionLost(failure.Failure(main.CONNECTION_DONE)) 225 d.callback((fullFeedId, fd)) 226 227 reactor.callLater(0, loseConnection) 228 return d 229 230 def error(failure): 231 self.warning('failed to retrieve %s from %s:%d', fullFeedId, 232 host, port) 233 self.debug('failure: %s', log.getFailureMessage(failure)) 234 self.debug('closing connection') 235 self.stopConnecting() 236 return failure 237 238 d = self.startConnecting(host, port, authenticator) 239 d.addCallback(connected) 240 d.addCallback(feedSent) 241 d.addErrback(error) 242 return d 243
244 - def stopConnecting(self):
245 """Stop a pending or established connection made via 246 startConnecting(). 247 248 Stops any established or pending connection to a remote feed 249 server started via the startConnecting() method. Safe to call 250 even if connection has not been started. 251 """ 252 if self._factory: 253 self._factory.disconnect() 254 self._factory = None 255 # not sure if this is necessary; call it just in case, so we 256 # don't leave a lingering reference cycle 257 self.setRemoteReference(None)
258 259 ### IMedium methods
260 - def setRemoteReference(self, remoteReference):
261 self.remote = remoteReference
262
263 - def hasRemoteReference(self):
264 return self.remote is not None
265
266 - def callRemote(self, name, *args, **kwargs):
267 return self.remote.callRemote(name, args, kwargs)
268
269 - def remote_sendFeedReply(self, fullFeedId):
270 t = self.remote.broker.transport 271 # make sure we stop receiving PB messages 272 self.debug('stop reading from transport') 273 t.stopReading() 274 reactor.callLater(0, self._doFeedTo, fullFeedId, t)
275
276 - def _doFeedTo(self, fullFeedId, t):
277 self.debug('flushing PB write queue') 278 t.doWrite() 279 self.debug('stop writing to transport') 280 t.stopWriting() 281 282 # make sure shutdown() is not called on the socket 283 t.keepSocketAlive = True 284 285 fd = os.dup(t.fileno()) 286 # Similar to feedserver._sendFeedReplyCb, but since we are in a 287 # callLater, not doReadOrWrite, we call connectionLost directly 288 # on the transport. 289 t.connectionLost(failure.Failure(main.CONNECTION_DONE)) 290 291 # This medium object is of no use any more; drop our reference 292 # to the remote so we can avoid cycles. 293 self.setRemoteReference(None) 294 295 (flowName, componentName, feedName) = common.parseFullFeedId(fullFeedId) 296 feedId = common.feedId(componentName, feedName) 297 298 self.debug('firing deferred with feedId %s on fd %d', feedId, 299 fd) 300 self._feedToDeferred.callback((feedId, fd))
301