Package flumotion :: Package component :: Package misc :: Package porter :: Module porter
[hide private]

Source Code for Module flumotion.component.misc.porter.porter

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_porter -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  from urllib2 import urlparse 
 23   
 24  from twisted.internet import protocol, reactor, address, error, defer 
 25   
 26  from twisted.spread import pb 
 27  from twisted.cred import portal 
 28   
 29  from flumotion.common import medium, log, messages 
 30  from flumotion.twisted import credentials, fdserver, checkers 
 31  from flumotion.twisted import reflect 
 32   
 33  from flumotion.component import component 
 34  from flumotion.component.component import moods 
 35   
 36  import socket, string, os, random 
 37   
 38  from flumotion.common.messages import N_ 
 39  T_ = messages.gettexter('flumotion') 
 40   
41 -class PorterAvatar(pb.Avatar, log.Loggable):
42 """ 43 An Avatar in the porter representing a streamer 44 """
45 - def __init__(self, avatarId, porter, mind):
46 self.avatarId = avatarId 47 self.porter = porter 48 49 # The underlying transport is now accessible as 50 # self.mind.broker.transport, on which we can call sendFileDescriptor 51 self.mind = mind
52
53 - def isAttached(self):
54 return self.mind != None
55
56 - def logout(self):
57 self.debug("porter client %s logging out", self.avatarId) 58 self.mind = None
59
60 - def perspective_registerPath(self, path):
61 self.log("Perspective called: registering path \"%s\"" % path) 62 self.porter.registerPath(path, self)
63
64 - def perspective_deregisterPath(self, path):
65 self.log("Perspective called: deregistering path \"%s\"" % path) 66 self.porter.deregisterPath(path, self)
67
68 - def perspective_registerPrefix(self, prefix):
69 self.log("Perspective called: registering default") 70 self.porter.registerPrefix(prefix, self)
71
72 - def perspective_deregisterPrefix(self, prefix):
73 self.log("Perspective called: deregistering default") 74 self.porter.deregisterPrefix(prefix, self)
75
76 -class PorterRealm(log.Loggable):
77 """ 78 A Realm within the Porter that creates Avatars for streamers logging into 79 the porter. 80 """ 81 __implements__ = portal.IRealm 82
83 - def __init__(self, porter):
84 """ 85 @param porter: The porter that avatars created from here should use. 86 @type porter: L{Porter} 87 """ 88 self.porter = porter
89
90 - def requestAvatar(self, avatarId, mind, *interfaces):
91 self.log("Avatar requested for avatarId %s, mind %r, interfaces %r", 92 avatarId, mind, interfaces) 93 if pb.IPerspective in interfaces: 94 avatar = PorterAvatar(avatarId, self.porter, mind) 95 return pb.IPerspective, avatar, avatar.logout 96 else: 97 raise NotImplementedError("no interface")
98
99 -class PorterMedium(component.BaseComponentMedium):
100
101 - def remote_getPorterDetails(self):
102 """ 103 Return the location, login username/password, and listening port 104 and interface for the porter as a tuple (path, username, 105 password, port, interface). 106 """ 107 return (self.comp._socketPath, self.comp._username, 108 self.comp._password, self.comp._iptablesPort, 109 self.comp._interface)
110
111 -class Porter(component.BaseComponent, log.Loggable):
112 """ 113 The porter optionally sits in front of a set of streamer components. 114 The porter is what actually deals with incoming connections on a TCP socket. 115 It decides which streamer to direct the connection to, then passes the FD 116 (along with some amount of already-read data) to the appropriate streamer. 117 """ 118 119 componentMediumClass = PorterMedium 120
121 - def init(self):
122 # We maintain a map of path -> avatar (the underlying transport is 123 # accessible from the avatar, we need this for FD-passing) 124 self._mappings = {} 125 self._prefixes = {} 126 127 self._socketlistener = None 128 129 self._socketPath = None 130 self._username = None 131 self._password = None 132 self._port = None 133 self._iptablesPort = None 134 self._porterProtocol = None 135 136 self._interface = ''
137
138 - def registerPath(self, path, avatar):
139 """ 140 Register a path as being served by a streamer represented by this 141 avatar. Will remove any previous registration at this path. 142 143 @param path: The path to register 144 @type path: str 145 @param avatar: The avatar representing the streamer to direct this path 146 to 147 @type avatar: L{PorterAvatar} 148 """ 149 self.debug("Registering porter path \"%s\" to %r" % (path, avatar)) 150 if self._mappings.has_key(path): 151 self.warning("Replacing existing mapping for path \"%s\"" % path) 152 153 self._mappings[path] = avatar
154
155 - def deregisterPath(self, path, avatar):
156 """ 157 Attempt to deregister the given path. A deregistration will only be 158 accepted if the mapping is to the avatar passed. 159 160 @param path: The path to deregister 161 @type path: str 162 @param avatar: The avatar representing the streamer being deregistered 163 @type avatar: L{PorterAvatar} 164 """ 165 if self._mappings.has_key(path): 166 if self._mappings[path] == avatar: 167 self.debug("Removing porter mapping for \"%s\"" % path) 168 del self._mappings[path] 169 else: 170 self.warning("Mapping not removed: refers to a different avatar") 171 else: 172 self.warning("Mapping not removed: no mapping found")
173
174 - def registerPrefix(self, prefix, avatar):
175 """ 176 Register a destination for all requests directed to anything beginning 177 with a specified prefix. Where there are multiple matching prefixes, the 178 longest is selected. 179 180 @param avatar: The avatar being registered 181 @type avatar: L{PorterAvatar} 182 """ 183 184 self.debug("Setting prefix \"%s\" for porter", prefix) 185 if prefix in self._prefixes: 186 self.warning("Overwriting prefix") 187 188 self._prefixes[prefix] = avatar
189
190 - def deregisterPrefix(self, prefix, avatar):
191 """ 192 Attempt to deregister a default destination for all requests not 193 directed to a specifically-mapped path. This will only succeed if the 194 default is currently equal to this avatar. 195 196 @param avatar: The avatar being deregistered 197 @type avatar: L{PorterAvatar} 198 """ 199 if prefix not in self._prefixes: 200 self.warning("Mapping not removed: no mapping found") 201 return 202 203 if self._prefixes[prefix] == avatar: 204 self.debug("Removing prefix destination from porter") 205 del self._prefixes[prefix] 206 else: 207 self.warning("Not removing prefix destination: expected avatar not found")
208
209 - def findPrefixMatch(self, path):
210 found = None 211 # TODO: Horribly inefficient. Replace with pathtree code. 212 for prefix in self._prefixes.keys(): 213 self.log("Checking: %r, %r" % (prefix, path)) 214 if (path.startswith(prefix) and (not found or len(found) < len(prefix))): 215 found = prefix 216 if found: 217 return self._prefixes[found] 218 else: 219 return None
220
221 - def findDestination(self, path):
222 """ 223 Find a destination Avatar for this path. 224 @returns: The Avatar for this mapping, or None. 225 """ 226 227 if self._mappings.has_key(path): 228 return self._mappings[path] 229 else: 230 return self.findPrefixMatch(path)
231 232
233 - def generateSocketPath(self):
234 """ 235 Generate a socket pathname in an appropriate location 236 """ 237 # Also see worker/worker.py:_getSocketPath(), and note that this suffers 238 # from the same potential race. 239 import tempfile 240 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.porter.') 241 os.close(fd) 242 243 return name
244
245 - def generateRandomString(self, numchars):
246 """ 247 Generate a random US-ASCII string of length numchars 248 """ 249 str = "" 250 chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" 251 for _ in range(numchars): 252 str += chars[random.randint(0, len(chars)-1)] 253 254 return str
255
256 - def have_properties(self):
257 props = self.config['properties'] 258 259 self.fixRenamedProperties(props, 260 [('socket_path', 'socket-path')]) 261 262 # We can operate in two modes: explicitly configured (neccesary if you 263 # want to handle connections from components in other managers), and 264 # self-configured (which is sufficient for slaving only streamers 265 # within this manager 266 if props.has_key('socket-path'): 267 # Explicitly configured 268 self._socketPath = props['socket-path'] 269 self._username = props['username'] 270 self._password = props['password'] 271 else: 272 # Self-configuring. Use a randomly create username/password, and 273 # a socket with a random name. 274 self._username = self.generateRandomString(12) 275 self._password = self.generateRandomString(12) 276 self._socketPath = self.generateSocketPath() 277 278 self._port = int(props['port']) 279 self._iptablesPort = int(props.get('iptables-port', self._port)) 280 self._porterProtocol = props.get('protocol', 281 'flumotion.component.misc.porter.porter.HTTPPorterProtocol') 282 self._interface = props.get('interface', '')
283
284 - def do_stop(self):
285 d = None 286 if self._socketlistener: 287 # stopListening() calls (via a callLater) connectionLost(), which 288 # will unlink our socket, so we don't need to explicitly delete it. 289 d = self._socketlistener.stopListening() 290 self._socketlistener = None 291 return d
292
293 - def do_setup(self):
294 # Create our combined PB-server/fd-passing channel 295 self.have_properties() 296 realm = PorterRealm(self) 297 checker = checkers.FlexibleCredentialsChecker() 298 checker.addUser(self._username, self._password) 299 300 p = portal.Portal(realm, [checker]) 301 serverfactory = pb.PBServerFactory(p) 302 303 # FIXME: shouldn't we be raising handled errors here? 304 305 try: 306 # Rather than a normal listenTCP() or listenUNIX(), we use 307 # listenWith so that we can specify our particular Port, which 308 # creates Transports that we know how to pass FDs over. 309 try: 310 os.unlink(self._socketPath) 311 except: 312 pass 313 314 self._socketlistener = reactor.listenWith( 315 fdserver.FDPort, self._socketPath, serverfactory) 316 self.debug("Now listening on socketPath %s" % self._socketPath) 317 except error.CannotListenError, e: 318 self.warning("Failed to create socket %s" % self._socketPath) 319 m = messages.Error(T_(N_( 320 "Network error: socket path %s is not available."), 321 self._socketPath)) 322 self.addMessage(m) 323 self.setMood(moods.sad) 324 return defer.fail(e) 325 326 # Create the class that deals with the specific protocol we're proxying 327 # in this porter. 328 try: 329 proto = reflect.namedAny(self._porterProtocol) 330 self.debug("Created proto %r" % proto) 331 except: 332 self.warning("Failed to import protocol '%s', defaulting to HTTP" % 333 self._porterProtocol) 334 proto = HTTPPorterProtocol 335 336 # And of course we also want to listen for incoming requests in the 337 # appropriate protocol (HTTP, RTSP, etc.) 338 factory = PorterProtocolFactory(self, proto) 339 try: 340 reactor.listenWith( 341 fdserver.PassableServerPort, self._port, factory, 342 interface=self._interface) 343 self.debug("Now listening on port %d" % self._port) 344 except error.CannotListenError, e: 345 self.warning("Failed to listen on port %d" % self._port) 346 m = messages.Error(T_(N_( 347 "Network error: TCP port %d is not available."), self._port)) 348 self.addMessage(m) 349 self.setMood(moods.sad) 350 return defer.fail(e)
351
352 -class PorterProtocolFactory(protocol.Factory):
353 - def __init__(self, porter, protocol):
354 self._porter = porter 355 self.protocol = protocol
356
357 - def buildProtocol(self, addr):
358 p = self.protocol(self._porter) 359 p.factory = self 360 return p
361
362 -class PorterProtocol(protocol.Protocol, log.Loggable):
363 """ 364 The base porter is capable of accepting HTTP-like protocols (including 365 RTSP) - it reads the first line of a request, and makes the decision 366 solely on that. 367 368 We can't guarantee that we read precisely a line, so the buffer we 369 accumulate will actually be larger than what we actually parse. 370 371 @cvar MAX_SIZE: the maximum number of bytes allowed for the first line 372 @cvar delimiters: a list of valid line delimiters I check for 373 """ 374 # Don't permit a first line longer than this. 375 MAX_SIZE = 4096 376 377 # Timeout any client connected to the porter for longer than this. A normal 378 # client should only ever be connected for a fraction of a second. 379 PORTER_CLIENT_TIMEOUT = 30 380 381 # In fact, because we check \r, we'll never need to check for \r\n - we 382 # leave this in as \r\n is the more correct form. At the other end, this 383 # gets processed by a full protocol implementation, so being flexible hurts 384 # us not at all 385 delimiters = ['\r\n', '\n', '\r'] 386
387 - def __init__(self, porter):
388 self._buffer = '' 389 self._porter = porter 390 391 self.debug("Accepted connection") 392 393 self._timeoutDC = reactor.callLater(self.PORTER_CLIENT_TIMEOUT, 394 self._timeout)
395
396 - def _timeout(self):
397 self._timeoutDC = None 398 self.debug("Timing out porter client after %d seconds", 399 self.PORTER_CLIENT_TIMEOUT) 400 self.transport.loseConnection()
401
402 - def connectionLost(self, reason):
403 if self._timeoutDC: 404 self._timeoutDC.cancel() 405 self._timeoutDC = None
406
407 - def dataReceived(self, data):
408 self._buffer = self._buffer + data 409 self.log("Got data, buffer now \"%s\"" % self._buffer) 410 # We accept more than just '\r\n' (the true HTTP line end) in the 411 # interests of compatibility. 412 for delim in self.delimiters: 413 try: 414 line, remaining = self._buffer.split(delim, 1) 415 break 416 except ValueError: 417 # We didn't find this delimiter; continue with the others. 418 pass 419 else: 420 # Failed to find a valid delimiter. 421 self.log("No valid delimiter found") 422 if len(self._buffer) > self.MAX_SIZE: 423 self.log("Dropping connection!") 424 return self.transport.loseConnection() 425 else: 426 # No delimiter found; haven't reached the length limit yet. 427 # Wait for more data. 428 return 429 430 # Got a line. self._buffer is still our entire buffer, should be 431 # provided to the slaved process. 432 identifier = self.parseLine(line) 433 434 if not identifier: 435 self.log("Couldn't find identifier in first line") 436 return self.transport.loseConnection() 437 438 # Ok, we have an identifier. Is it one we know about, or do we have 439 # a default destination? 440 destinationAvatar = self._porter.findDestination(identifier) 441 442 if not destinationAvatar or not destinationAvatar.isAttached(): 443 if destinationAvatar: 444 self.debug("There was an avatar, but it logged out?") 445 self.debug("No destination avatar found for \"%s\"" % identifier) 446 self.writeNotFoundResponse() 447 return self.transport.loseConnection() 448 449 # Transfer control over this FD. Pass all the data so-far received 450 # along in the same message. The receiver will push that data into 451 # the Twisted Protocol object as if it had been normally received, 452 # so it looks to the receiver like it has read the entire data stream 453 # itself. 454 455 # TODO: Check out blocking characteristics of sendFileDescriptor, fix 456 # if it blocks. 457 self.debug("Attempting to send FD: %d" % self.transport.fileno()) 458 destinationAvatar.mind.broker.transport.sendFileDescriptor( 459 self.transport.fileno(), self._buffer) 460 461 # After this, we don't want to do anything with the FD, other than 462 # close our reference to it - but not close the actual TCP connection. 463 # We set keepSocketAlive to make loseConnection() only call close() 464 # rather than shutdown() then close() 465 self.transport.keepSocketAlive = True 466 self.transport.loseConnection()
467
468 - def parseLine(self, line):
469 """ 470 Parse the initial line of the response. Return a string usable for 471 uniquely identifying the stream being requested, or None if the request 472 is unreadable. 473 474 Subclasses should override this. 475 """ 476 raise NotImplementedError
477
478 - def writeNotFoundResponse(self):
479 """ 480 Write a response indicating that the requested resource was not found 481 in this protocol. 482 483 Subclasses should override this to use the correct protocol. 484 """ 485 raise NotImplementedError
486
487 -class HTTPPorterProtocol(PorterProtocol):
488 scheme = 'http' 489 protos = ["HTTP/1.0", "HTTP/1.1"] 490
491 - def parseLine(self, line):
492 try: 493 (method, location, proto) = map(string.strip, line.split(' ', 2)) 494 495 if proto not in self.protos: 496 return None 497 498 # Currently, we just return the path part of the URL. 499 # Use the URL parsing from urllib2. 500 location = urlparse.urlparse(location, 'http')[2] 501 self.log('parsed %s %s %s' % (method, location, proto)) 502 if not location or location == '': 503 return None 504 505 return location 506 507 except ValueError: 508 return None
509
510 - def writeNotFoundResponse(self):
511 self.transport.write("HTTP/1.0 404 Not Found\r\n\r\nResource unknown")
512
513 -class RTSPPorterProtocol(HTTPPorterProtocol):
514 scheme = 'rtsp' 515 protos = ["RTSP/1.0"] 516
517 - def writeNotFoundResponse(self):
518 self.transport.write("RTSP/1.0 404 Not Found\r\n\r\nResource unknown")
519