Package flumotion :: Package component :: Package base :: Module http
[hide private]

Source Code for Module flumotion.component.base.http

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_http -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import struct 
 23  import socket 
 24   
 25  from twisted.web import http, server 
 26  from twisted.web import resource as web_resource 
 27  from twisted.internet import reactor, defer 
 28  from twisted.python import reflect, failure 
 29   
 30  from flumotion.configure import configure 
 31  from flumotion.common import errors 
 32  from flumotion.twisted.credentials import cryptChallenge 
 33   
 34  from flumotion.common import common, log, keycards 
 35   
 36  #__all__ = ['HTTPStreamingResource', 'MultifdSinkStreamer'] 
 37   
 38  HTTP_SERVER_NAME = 'FlumotionHTTPServer' 
 39  HTTP_SERVER_VERSION = configure.version 
 40   
 41  ERROR_TEMPLATE = """<!doctype html public "-//IETF//DTD HTML 2.0//EN"> 
 42  <html> 
 43  <head> 
 44    <title>%(code)d %(error)s</title> 
 45  </head> 
 46  <body> 
 47  <h2>%(code)d %(error)s</h2> 
 48  </body> 
 49  </html> 
 50  """ 
 51   
 52  HTTP_SERVER = '%s/%s' % (HTTP_SERVER_NAME, HTTP_SERVER_VERSION) 
 53   
 54  ### This is new Issuer code that eventually should move to e.g. 
 55  ### flumotion.common.keycards or related 
 56   
57 -class Issuer(log.Loggable):
58 """ 59 I am a base class for all Issuers. 60 An issuer issues keycards of a given class based on an object 61 (incoming HTTP request, ...) 62 """
63 - def issue(self, *args, **kwargs):
64 """ 65 Return a keycard, or None, based on the given arguments. 66 """ 67 raise NotImplementedError
68
69 -class HTTPGenericIssuer(Issuer):
70 """ 71 I create L{flumotion.common.keycards.Keycard} based on just a 72 standard HTTP request. Useful for authenticating based on 73 server-side checks such as time, rather than client credentials. 74 """
75 - def issue(self, request):
76 keycard = keycards.KeycardGeneric() 77 self.debug("Asking for authentication, generic HTTP") 78 return keycard
79
80 -class HTTPAuthIssuer(Issuer):
81 """ 82 I create L{flumotion.common.keycards.KeycardUACPP} keycards based on 83 an incoming L{twisted.protocols.http.Request} request's standard 84 HTTP authentication information. 85 """
86 - def issue(self, request):
87 # for now, we're happy with a UACPP keycard; the password arrives 88 # plaintext anyway 89 keycard = keycards.KeycardUACPP( 90 request.getUser(), 91 request.getPassword(), request.getClientIP()) 92 self.debug('Asking for authentication, user %s, password %s, ip %s' % ( 93 keycard.username, keycard.password, keycard.address)) 94 return keycard
95
96 -class HTTPTokenIssuer(Issuer):
97 """ 98 I create L{flumotion.common.keycards.KeycardToken} keycards based on 99 an incoming L{twisted.protocols.http.Request} request's GET "token" 100 parameter. 101 """
102 - def issue(self, request):
103 if not 'token' in request.args.keys(): 104 return None 105 106 # args can have lists as values, if more than one specified 107 token = request.args['token'] 108 if not isinstance(token, str): 109 token = token[0] 110 111 keycard = keycards.KeycardToken(token, 112 request.getClientIP()) 113 return keycard
114 115 BOUNCER_SOCKET = 'flumotion.component.bouncers.plug.BouncerPlug' 116
117 -class HTTPAuthentication(log.Loggable):
118 """ 119 Helper object for handling HTTP authentication for twisted.web 120 Resources, using issuers and bouncers. 121 """ 122 123 logCategory = 'httpauth' 124 125 KEYCARD_TTL = 60 * 60 126 KEYCARD_KEEPALIVE_INTERVAL = 20 * 60 127 KEYCARD_TRYAGAIN_INTERVAL = 1 * 60 128
129 - def __init__(self, component):
130 self.component = component 131 self._fdToKeycard = {} # request fd -> Keycard 132 self._idToKeycard = {} # keycard id -> Keycard 133 self._fdToDurationCall = {} # request fd -> IDelayedCall for duration 134 self._domain = None # used for auth challenge and on keycard 135 self._issuer = HTTPAuthIssuer() # issues keycards; default for compat 136 self.bouncerName = None 137 self.setRequesterId(component.getName()) 138 self._defaultDuration = None # default duration to use if the keycard 139 # doesn't specify one. 140 self._pendingCleanups = [] 141 self._keepAlive = None 142 143 if (BOUNCER_SOCKET in self.component.plugs 144 and self.component.plugs[BOUNCER_SOCKET]): 145 assert len(self.component.plugs[BOUNCER_SOCKET]) == 1 146 self.plug = self.component.plugs[BOUNCER_SOCKET][0] 147 else: 148 self.plug = None
149
150 - def scheduleKeepAlive(self, tryingAgain=False):
151 def timeout(): 152 def reschedule(res): 153 if isinstance(res, failure.Failure): 154 self.info('keepAlive failed, rescheduling in %d ' 155 'seconds', self.KEYCARD_TRYAGAIN_INTERVAL) 156 self._keepAlive = None 157 self.scheduleKeepAlive(tryingAgain=True) 158 else: 159 self.info('keepAlive successful') 160 self._keepAlive = None 161 self.scheduleKeepAlive(tryingAgain=False)
162 163 if self.bouncerName is not None: 164 self.debug('calling keepAlive on bouncer %s', 165 self.bouncerName) 166 d = self.keepAlive(self.bouncerName, self.issuerName, 167 self.KEYCARD_TTL) 168 d.addCallbacks(reschedule, reschedule) 169 else: 170 self.scheduleKeepAlive()
171 172 if tryingAgain: 173 self._keepAlive = reactor.callLater(self.KEYCARD_TRYAGAIN_INTERVAL, 174 timeout) 175 else: 176 self._keepAlive = reactor.callLater(self.KEYCARD_KEEPALIVE_INTERVAL, 177 timeout) 178
179 - def stopKeepAlive(self):
180 if self._keepAlive is not None: 181 self._keepAlive.cancel() 182 self._keepAlive = None
183
184 - def setDomain(self, domain):
185 """ 186 Set a domain name on the resource, used in HTTP auth challenges and 187 on the keycard. 188 189 @type domain: string 190 """ 191 self._domain = domain
192
193 - def setBouncerName(self, bouncerName):
194 self.bouncerName = bouncerName
195
196 - def setRequesterId(self, requesterId):
197 self.requesterId = requesterId 198 # make something uniquey 199 self.issuerName = str(self.requesterId) + '-' + cryptChallenge()
200
201 - def setDefaultDuration(self, defaultDuration):
202 self._defaultDuration = defaultDuration
203
204 - def setIssuerClass(self, issuerClass):
205 # FIXME: in the future, we want to make this pluggable and have it 206 # look up somewhere ? 207 if issuerClass == 'HTTPTokenIssuer': 208 self._issuer = HTTPTokenIssuer() 209 elif issuerClass == 'HTTPAuthIssuer': 210 self._issuer = HTTPAuthIssuer() 211 elif issuerClass == 'HTTPGenericIssuer': 212 self._issuer = HTTPGenericIssuer() 213 else: 214 raise ValueError, "issuerClass %s not accepted" % issuerClass
215
216 - def authenticate(self, request):
217 """ 218 Returns: a deferred returning a keycard or None 219 """ 220 keycard = self._issuer.issue(request) 221 if not keycard: 222 self.debug('no keycard from issuer, firing None') 223 return defer.succeed(None) 224 225 keycard.requesterId = self.requesterId 226 keycard.issuerName = self.issuerName 227 keycard._fd = request.transport.fileno() 228 keycard.setDomain(self._domain) 229 230 if self.plug: 231 self.debug('authenticating against plug') 232 return self.plug.authenticate(keycard) 233 elif self.bouncerName == None: 234 self.debug('no bouncer, accepting') 235 return defer.succeed(keycard) 236 else: 237 keycard.ttl = self.KEYCARD_TTL 238 self.debug('sending keycard to remote bouncer %r', 239 self.bouncerName) 240 return self.authenticateKeycard(self.bouncerName, keycard)
241
242 - def authenticateKeycard(self, bouncerName, keycard):
243 return self.component.medium.authenticate(bouncerName, keycard)
244
245 - def keepAlive(self, bouncerName, issuerName, ttl):
246 return self.component.medium.keepAlive(bouncerName, issuerName, ttl)
247
248 - def cleanupKeycard(self, bouncerName, keycard):
249 return self.component.medium.removeKeycardId(bouncerName, keycard.id)
250 251 # FIXME: check this
252 - def clientDone(self, fd):
253 return self.component.remove_client(fd)
254
255 - def doCleanupKeycard(self, bouncerName, keycard):
256 # cleanup this one keycard, and take the opportunity to retry 257 # previous failed cleanups 258 def cleanup(bouncerName, keycard): 259 def cleanupLater(res, pair): 260 self.log('failed to clean up keycard %r, will do ' 261 'so later', keycard) 262 self._pendingCleanups.append(pair)
263 d = self.cleanupKeycard(bouncerName, keycard) 264 d.addErrback(cleanupLater, (bouncerName, keycard)) 265 pending = self._pendingCleanups 266 self._pendingCleanups = [] 267 cleanup(bouncerName, keycard) 268 for bouncerName, keycard in pending: 269 cleanup(bouncerName, keycard) 270 271 # public
272 - def cleanupAuth(self, fd):
273 if self.bouncerName and self._fdToKeycard.has_key(fd): 274 keycard = self._fdToKeycard[fd] 275 del self._fdToKeycard[fd] 276 del self._idToKeycard[keycard.id] 277 self.debug('[fd %5d] asking bouncer %s to remove keycard id %s', 278 fd, self.bouncerName, keycard.id) 279 self.doCleanupKeycard(self.bouncerName, keycard) 280 if self._fdToDurationCall.has_key(fd): 281 self.debug('[fd %5d] canceling later expiration call' % fd) 282 self._fdToDurationCall[fd].cancel() 283 del self._fdToDurationCall[fd]
284
285 - def _durationCallLater(self, fd):
286 """ 287 Expire a client due to a duration expiration. 288 """ 289 self.debug('[fd %5d] duration exceeded, expiring client' % fd) 290 291 # we're called from a callLater, so we've already run; just delete 292 if self._fdToDurationCall.has_key(fd): 293 del self._fdToDurationCall[fd] 294 295 self.debug('[fd %5d] asking streamer to remove client' % fd) 296 self.clientDone(fd)
297
298 - def expireKeycard(self, keycardId):
299 """ 300 Expire a client's connection associated with the keycard Id. 301 """ 302 keycard = self._idToKeycard[keycardId] 303 fd = keycard._fd 304 305 self.debug('[fd %5d] expiring client' % fd) 306 307 if self._fdToDurationCall.has_key(fd): 308 self.debug('[fd %5d] canceling later expiration call' % fd) 309 self._fdToDurationCall[fd].cancel() 310 del self._fdToDurationCall[fd] 311 312 self.debug('[fd %5d] asking streamer to remove client' % fd) 313 self.clientDone(fd)
314 315 ### resource.Resource methods 316
317 - def startAuthentication(self, request):
318 d = self.authenticate(request) 319 d.addCallback(self._authenticatedCallback, request) 320 d.addErrback(self._authenticatedErrback, request) 321 322 return d
323
324 - def _authenticatedCallback(self, keycard, request):
325 # !: since we are a callback, the incoming fd might have gone away 326 # and closed 327 self.debug('_authenticatedCallback: keycard %r' % keycard) 328 if not keycard: 329 raise errors.NotAuthenticatedError() 330 331 # properly authenticated 332 if request.method == 'GET': 333 fd = request.transport.fileno() 334 335 if self.bouncerName: 336 if keycard.id in self._idToKeycard: 337 self.warning("Duplicate keycard id: refusing") 338 raise errors.NotAuthenticatedError() 339 340 self._fdToKeycard[fd] = keycard 341 self._idToKeycard[keycard.id] = keycard 342 343 duration = keycard.duration or self._defaultDuration 344 345 if duration: 346 self.debug('new connection on %d will expire in %f seconds' % ( 347 fd, duration)) 348 self._fdToDurationCall[fd] = reactor.callLater( 349 duration, self._durationCallLater, fd) 350 351 return None
352
353 - def _authenticatedErrback(self, failure, request):
354 failure.trap(errors.UnknownComponentError, errors.NotAuthenticatedError) 355 self._handleUnauthorized(request) 356 return failure
357
358 - def _handleUnauthorized(self, request):
359 self.debug('client from %s is unauthorized' % (request.getClientIP())) 360 request.setHeader('content-type', 'text/html') 361 request.setHeader('server', HTTP_SERVER_VERSION) 362 if self._domain: 363 request.setHeader('WWW-Authenticate', 364 'Basic realm="%s"' % self._domain) 365 366 error_code = http.UNAUTHORIZED 367 request.setResponseCode(error_code) 368 369 # we have to write data ourselves, 370 # since we already returned NOT_DONE_YET 371 html = ERROR_TEMPLATE % {'code': error_code, 372 'error': http.RESPONSES[error_code]} 373 request.write(html) 374 request.finish()
375
376 -class LogFilter:
377 - def __init__(self):
378 self.filters = [] # list of (network, mask)
379
380 - def addIPFilter(self, filter):
381 """ 382 Add an IP filter of the form IP/prefix-length (CIDR syntax), or just 383 a single IP address 384 """ 385 definition = filter.split('/') 386 if len(definition) == 2: 387 (net, prefixlen) = definition 388 prefixlen = int(prefixlen) 389 elif len(definition) == 1: 390 net = definition[0] 391 prefixlen = 32 392 else: 393 raise errors.ConfigError( 394 "Cannot parse filter definition %s" % filter) 395 396 if prefixlen < 0 or prefixlen > 32: 397 raise errors.ConfigError("Invalid prefix length") 398 399 mask = ~((1 << (32 - prefixlen)) - 1) 400 try: 401 net = struct.unpack(">I", socket.inet_pton(socket.AF_INET, net))[0] 402 except: 403 raise errors.ConfigError("Failed to parse network address %s" % net) 404 net = net & mask # just in case 405 406 self.filters.append((net, mask))
407
408 - def isInRange(self, ip):
409 """ 410 Return true if ip is in any of the defined network(s) for this filter 411 """ 412 # Handles IPv4 only. 413 realip = struct.unpack(">I", socket.inet_pton(socket.AF_INET, ip))[0] 414 for f in self.filters: 415 if (realip & f[1]) == f[0]: 416 return True 417 return False
418