Package flumotion :: Package component :: Package consumers :: Package httpstreamer :: Module http
[hide private]

Source Code for Module flumotion.component.consumers.httpstreamer.http

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_http -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import time 
 23   
 24  import gobject 
 25  import gst 
 26   
 27  # socket needed to get hostname 
 28  import socket 
 29   
 30  from twisted.internet import reactor, error, defer 
 31  from twisted.web import server 
 32  from twisted.cred import credentials 
 33  from zope.interface import implements 
 34   
 35  from flumotion.component import feedcomponent 
 36  from flumotion.common import bundle, common, gstreamer, errors, pygobject 
 37  from flumotion.common import messages, netutils, log, interfaces 
 38   
 39  from flumotion.twisted import fdserver 
 40  from flumotion.component.misc.porter import porterclient 
 41   
 42  # proxy import 
 43  from flumotion.component.component import moods 
 44  from flumotion.common.pygobject import gsignal 
 45   
 46  from flumotion.component.consumers.httpstreamer import resources 
 47  from flumotion.component.base import http 
 48   
 49  from flumotion.common.messages import N_ 
 50  T_ = messages.gettexter('flumotion') 
 51   
 52  __all__ = ['HTTPMedium', 'MultifdSinkStreamer'] 
 53   
 54   
 55  STATS_POLL_INTERVAL = 10 
 56   
 57  UI_UPDATE_THROTTLE_PERIOD = 2.0 # Don't update UI more than once every two  
 58                                  # seconds 
 59   
 60  # FIXME: generalize this class and move it out here ? 
61 -class Stats:
62 - def __init__(self, sink):
63 self.sink = sink 64 65 self.no_clients = 0 66 self.clients_added_count = 0 67 self.clients_removed_count = 0 68 self.start_time = time.time() 69 # keep track of the highest number and the last epoch this was reached 70 self.peak_client_number = 0 71 self.peak_epoch = self.start_time 72 self.load_deltas = [0, 0] 73 self._load_deltas_period = 10 # seconds 74 self._load_deltas_ongoing = [time.time(), 0, 0] 75 self._currentBitrate = -1 # not known yet 76 self._lastBytesReceived = -1 # not known yet 77 78 # keep track of average clients by tracking last average and its time 79 self.average_client_number = 0 80 self.average_time = self.start_time 81 82 self.hostname = "localhost" 83 self.port = 0 84 self.mountPoint = "/"
85
86 - def _updateAverage(self):
87 # update running average of clients connected 88 now = time.time() 89 # calculate deltas 90 dt1 = self.average_time - self.start_time 91 dc1 = self.average_client_number 92 dt2 = now - self.average_time 93 dc2 = self.no_clients 94 self.average_time = now # we can update now that we used self.av 95 if dt1 == 0: 96 # first measurement 97 self.average_client_number = 0 98 else: 99 dt = dt1 + dt2 100 before = (dc1 * dt1) / dt 101 after = dc2 * dt2 / dt 102 self.average_client_number = before + after
103
104 - def clientAdded(self):
105 self._updateAverage() 106 107 self.no_clients += 1 108 self.clients_added_count +=1 109 110 # >= so we get the last epoch this peak was achieved 111 if self.no_clients >= self.peak_client_number: 112 self.peak_epoch = time.time() 113 self.peak_client_number = self.no_clients
114
115 - def clientRemoved(self):
116 self._updateAverage() 117 self.no_clients -= 1 118 self.clients_removed_count +=1
119
120 - def _updateStats(self):
121 """ 122 Periodically, update our statistics on load deltas, and update the 123 UIState with new values for total bytes, bitrate, etc. 124 """ 125 126 oldtime, oldadd, oldremove = self._load_deltas_ongoing 127 add, remove = self.clients_added_count, self.clients_removed_count 128 now = time.time() 129 diff = float(now - oldtime) 130 131 self.load_deltas = [(add-oldadd)/diff, (remove-oldremove)/diff] 132 self._load_deltas_ongoing = [now, add, remove] 133 134 bytesReceived = self.getBytesReceived() 135 if self._lastBytesReceived >= 0: 136 self._currentBitrate = ((bytesReceived - self._lastBytesReceived) * 137 8 / STATS_POLL_INTERVAL) 138 self._lastBytesReceived = bytesReceived 139 140 self.update_ui_state() 141 142 self._updateCallLaterId = reactor.callLater(STATS_POLL_INTERVAL, 143 self._updateStats)
144
145 - def getCurrentBitrate(self):
146 if self._currentBitrate >= 0: 147 return self._currentBitrate 148 else: 149 return self.getBytesReceived() * 8 / self.getUptime()
150
151 - def getBytesSent(self):
152 return self.sink.get_property('bytes-served')
153
154 - def getBytesReceived(self):
155 return self.sink.get_property('bytes-to-serve')
156
157 - def getUptime(self):
158 return time.time() - self.start_time
159
160 - def getClients(self):
161 return self.no_clients
162
163 - def getPeakClients(self):
164 return self.peak_client_number
165
166 - def getPeakEpoch(self):
167 return self.peak_epoch
168
169 - def getAverageClients(self):
170 return self.average_client_number
171
172 - def getUrl(self):
173 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
174
175 - def getLoadDeltas(self):
176 return self.load_deltas
177
178 - def updateState(self, set):
179 c = self 180 181 bytes_sent = c.getBytesSent() 182 bytes_received = c.getBytesReceived() 183 uptime = c.getUptime() 184 185 set('stream-mime', c.get_mime()) 186 set('stream-url', c.getUrl()) 187 set('stream-uptime', common.formatTime(uptime)) 188 bitspeed = bytes_received * 8 / uptime 189 currentbitrate = self.getCurrentBitrate() 190 set('stream-bitrate', common.formatStorage(bitspeed) + 'bit/s') 191 set('stream-current-bitrate', 192 common.formatStorage(currentbitrate) + 'bit/s') 193 set('stream-totalbytes', common.formatStorage(bytes_received) + 'Byte') 194 set('stream-bitrate-raw', bitspeed) 195 set('stream-totalbytes-raw', bytes_received) 196 197 set('clients-current', str(c.getClients())) 198 set('clients-max', str(c.getMaxClients())) 199 set('clients-peak', str(c.getPeakClients())) 200 set('clients-peak-time', c.getPeakEpoch()) 201 set('clients-average', str(int(c.getAverageClients()))) 202 203 bitspeed = bytes_sent * 8 / uptime 204 set('consumption-bitrate', common.formatStorage(bitspeed) + 'bit/s') 205 set('consumption-totalbytes', common.formatStorage(bytes_sent) + 'Byte') 206 set('consumption-bitrate-raw', bitspeed) 207 set('consumption-totalbytes-raw', bytes_sent)
208
209 -class HTTPMedium(feedcomponent.FeedComponentMedium):
210 - def __init__(self, comp):
211 """ 212 @type comp: L{Stats} 213 """ 214 feedcomponent.FeedComponentMedium.__init__(self, comp)
215
216 - def authenticate(self, bouncerName, keycard):
217 """ 218 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None. 219 """ 220 d = self.callRemote('authenticate', bouncerName, keycard) 221 return d
222
223 - def keepAlive(self, bouncerName, issuerName, ttl):
224 """ 225 @rtype: L{twisted.internet.defer.Deferred} 226 """ 227 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
228
229 - def removeKeycardId(self, bouncerName, keycardId):
230 """ 231 @rtype: L{twisted.internet.defer.Deferred} 232 """ 233 return self.callRemote('removeKeycardId', bouncerName, keycardId)
234 235 ### remote methods for manager to call on
236 - def remote_expireKeycard(self, keycardId):
237 self.comp.httpauth.expireKeycard(keycardId)
238
239 - def remote_notifyState(self):
240 self.comp.update_ui_state()
241
242 - def remote_rotateLog(self):
243 self.comp.resource.rotateLogs()
244
245 - def remote_getStreamData(self):
246 return self.comp.getStreamData()
247
248 - def remote_getLoadData(self):
249 return self.comp.getLoadData()
250
251 - def remote_updatePorterDetails(self, path, username, password):
252 return self.comp.updatePorterDetails(path, username, password)
253 254 ### the actual component is a streamer using multifdsink
255 -class MultifdSinkStreamer(feedcomponent.ParseLaunchComponent, Stats):
256 implements(interfaces.IStreamingComponent) 257 258 checkOffset = True 259 260 # this object is given to the HTTPMedium as comp 261 logCategory = 'cons-http' 262 263 pipe_template = 'multifdsink name=sink ' + \ 264 'sync=false ' + \ 265 'recover-policy=3' 266 267 componentMediumClass = HTTPMedium 268
269 - def init(self):
270 reactor.debug = True 271 self.debug("HTTP streamer initialising") 272 273 self.caps = None 274 self.resource = None 275 self.httpauth = None 276 self.mountPoint = None 277 self.burst_on_connect = False 278 279 self.description = None 280 281 self.type = None 282 283 # Used if we've slaved to a porter. 284 self._pbclient = None 285 self._porterUsername = None 286 self._porterPassword = None 287 self._porterPath = None 288 289 # Or if we're a master, we open our own port here. Also used for URLs 290 # in the porter case. 291 self.port = None 292 # We listen on this interface, if set. 293 self.iface = None 294 295 self._tport = None 296 297 self._updateCallLaterId = None 298 self._lastUpdate = 0 299 self._updateUI_DC = None 300 301 self._pending_removals = {} 302 303 for i in ('stream-mime', 'stream-uptime', 'stream-current-bitrate', 304 'stream-bitrate', 'stream-totalbytes', 'clients-current', 305 'clients-max', 'clients-peak', 'clients-peak-time', 306 'clients-average', 'consumption-bitrate', 307 'consumption-totalbytes', 'stream-bitrate-raw', 308 'stream-totalbytes-raw', 'consumption-bitrate-raw', 309 'consumption-totalbytes-raw', 'stream-url'): 310 self.uiState.addKey(i, None)
311
312 - def getDescription(self):
313 return self.description
314
315 - def get_pipeline_string(self, properties):
316 return self.pipe_template
317
318 - def check_properties(self, props, addMessage):
319 320 # F0.6: remove backwards-compatible properties 321 self.fixRenamedProperties(props, [ 322 ('issuer', 'issuer-class'), 323 ('mount_point', 'mount-point'), 324 ('porter_socket_path', 'porter-socket-path'), 325 ('porter_username', 'porter-username'), 326 ('porter_password', 'porter-password'), 327 ('user_limit', 'client-limit'), 328 ('bandwidth_limit', 'bandwidth-limit'), 329 ('burst_on_connect', 'burst-on-connect'), 330 ('burst_size', 'burst-size'), 331 ]) 332 333 if props.get('type', 'master') == 'slave': 334 for k in 'socket-path', 'username', 'password': 335 if not 'porter-' + k in props: 336 raise errors.ConfigError("slave mode, missing required" 337 " property 'porter-%s'" % k) 338 339 if 'burst-size' in props and 'burst-time' in props: 340 raise errors.ConfigError('both burst-size and burst-time ' 341 'set, cannot satisfy') 342 343 # tcp is where multifdsink is 344 version = gstreamer.get_plugin_version('tcp') 345 if version < (0, 10, 9, 1): 346 m = messages.Error(T_(N_( 347 "Version %s of the '%s' GStreamer plug-in is too old.\n"), 348 ".".join(map(str, version)), 'multifdsink')) 349 m.add(T_(N_("Please upgrade '%s' to version %s."), 350 'gst-plugins-base', '0.10.10')) 351 addMessage(m)
352
353 - def time_bursting_supported(self, sink):
354 try: 355 sink.get_property('units-max') 356 return True 357 except TypeError: 358 return False
359
360 - def setup_burst_mode(self, sink):
361 if self.burst_on_connect: 362 if self.burst_time and self.time_bursting_supported(sink): 363 self.debug("Configuring burst mode for %f second burst", 364 self.burst_time) 365 # Set a burst for configurable minimum time, plus extra to 366 # start from a keyframe if needed. 367 sink.set_property('sync-method', 4) # burst-keyframe 368 sink.set_property('burst-unit', 2) # time 369 sink.set_property('burst-value', 370 long(self.burst_time * gst.SECOND)) 371 372 # We also want to ensure that we have sufficient data available 373 # to satisfy this burst; and an appropriate maximum, all 374 # specified in units of time. 375 sink.set_property('time-min', 376 long((self.burst_time + 5) * gst.SECOND)) 377 378 sink.set_property('unit-type', 2) # time 379 sink.set_property('units-soft-max', 380 long((self.burst_time + 8) * gst.SECOND)) 381 sink.set_property('units-max', 382 long((self.burst_time + 10) * gst.SECOND)) 383 elif self.burst_size: 384 self.debug("Configuring burst mode for %d kB burst", 385 self.burst_size) 386 # If we have a burst-size set, use modern 387 # needs-recent-multifdsink behaviour to have complex bursting. 388 # In this mode, we burst a configurable minimum, plus extra 389 # so we start from a keyframe (or less if we don't have a 390 # keyframe available) 391 sink.set_property('sync-method', 'burst-keyframe') 392 sink.set_property('burst-unit', 'bytes') 393 sink.set_property('burst-value', self.burst_size * 1024) 394 395 # To use burst-on-connect, we need to ensure that multifdsink 396 # has a minimum amount of data available - assume 512 kB beyond 397 # the burst amount so that we should have a keyframe available 398 sink.set_property('bytes-min', (self.burst_size + 512) * 1024) 399 400 # And then we need a maximum still further above that - the 401 # exact value doesn't matter too much, but we want it reasonably 402 # small to limit memory usage. multifdsink doesn't give us much 403 # control here, we can only specify the max values in buffers. 404 # We assume each buffer is close enough to 4kB - true for asf 405 # and ogg, at least 406 sink.set_property('buffers-soft-max', 407 (self.burst_size + 1024) / 4) 408 sink.set_property('buffers-max', 409 (self.burst_size + 2048) / 4) 410 411 else: 412 # Old behaviour; simple burst-from-latest-keyframe 413 self.debug("simple burst-on-connect, setting sync-method 2") 414 sink.set_property('sync-method', 2) 415 416 sink.set_property('buffers-soft-max', 250) 417 sink.set_property('buffers-max', 500) 418 else: 419 self.debug("no burst-on-connect, setting sync-method 0") 420 sink.set_property('sync-method', 0) 421 422 sink.set_property('buffers-soft-max', 250) 423 sink.set_property('buffers-max', 500)
424
425 - def configure_pipeline(self, pipeline, properties):
426 Stats.__init__(self, sink=self.get_element('sink')) 427 428 self._updateCallLaterId = reactor.callLater(10, self._updateStats) 429 430 mountPoint = properties.get('mount-point', '') 431 if not mountPoint.startswith('/'): 432 mountPoint = '/' + mountPoint 433 self.mountPoint = mountPoint 434 435 # Hostname is used for a variety of purposes. We do a best-effort guess 436 # where nothing else is possible, but it's much preferable to just 437 # configure this 438 self.hostname = properties.get('hostname', None) 439 self.iface = self.hostname # We listen on this if explicitly configured, 440 # but not if it's only guessed at by the 441 # below code. 442 if not self.hostname: 443 # Don't call this nasty, nasty, probably flaky function unless we 444 # need to. 445 self.hostname = netutils.guess_public_hostname() 446 447 self.description = properties.get('description', None) 448 if self.description is None: 449 self.description = "Flumotion Stream" 450 451 # FIXME: tie these together more nicely 452 self.httpauth = http.HTTPAuthentication(self) 453 self.resource = resources.HTTPStreamingResource(self, 454 self.httpauth) 455 456 # check how to set client sync mode 457 sink = self.get_element('sink') 458 self.burst_on_connect = properties.get('burst-on-connect', False) 459 self.burst_size = properties.get('burst-size', 0) 460 self.burst_time = properties.get('burst-time', 0.0) 461 462 self.setup_burst_mode(sink) 463 464 sink.connect('deep-notify::caps', self._notify_caps_cb) 465 466 # these are made threadsafe using idle_add in the handler 467 sink.connect('client-added', self._client_added_handler) 468 469 # We now require a sufficiently recent multifdsink anyway that we can 470 # use the new client-fd-removed signal 471 sink.connect('client-fd-removed', self._client_fd_removed_cb) 472 sink.connect('client-removed', self._client_removed_cb) 473 474 if properties.has_key('client-limit'): 475 limit = int(properties['client-limit']) 476 self.resource.setUserLimit(limit) 477 if limit != self.resource.maxclients: 478 self.addMessage( 479 messages.Info(T_(N_("Unable to set the maximum " 480 "client limit to %d clients."), 481 limit), 482 debug=("Your system has limited " 483 "the ability to open file " 484 "descriptors. Check your " 485 "limits.conf to see how to " 486 "raise this limit."))) 487 488 if properties.has_key('bandwidth-limit'): 489 limit = int(properties['bandwidth-limit']) 490 if limit < 1000: 491 # The wizard used to set this as being in Mbps, oops. 492 self.debug("Bandwidth limit set to unreasonably low %d bps, " 493 "assuming this is meant to be Mbps", limit) 494 limit *= 1000000 495 self.resource.setBandwidthLimit(limit) 496 497 if properties.has_key('redirect-on-overflow'): 498 self.resource.setRedirectionOnLimits( 499 properties['redirect-on-overflow']) 500 501 if properties.has_key('bouncer'): 502 self.httpauth.setBouncerName(properties['bouncer']) 503 504 if properties.has_key('issuer-class'): 505 self.httpauth.setIssuerClass(properties['issuer-class']) 506 507 if properties.has_key('duration'): 508 self.httpauth.setDefaultDuration(float(properties['duration'])) 509 510 if properties.has_key('domain'): 511 self.httpauth.setDomain(properties['domain']) 512 513 if self.config.has_key('avatarId'): 514 self.httpauth.setRequesterId(self.config['avatarId']) 515 516 if properties.has_key('ip-filter'): 517 filter = http.LogFilter() 518 for f in properties['ip-filter']: 519 filter.addIPFilter(f) 520 self.resource.setLogFilter(filter) 521 522 self.type = properties.get('type', 'master') 523 if self.type == 'slave': 524 # already checked for these in do_check 525 self._porterPath = properties['porter-socket-path'] 526 self._porterUsername = properties['porter-username'] 527 self._porterPassword = properties['porter-password'] 528 529 self.port = int(properties.get('port', 8800))
530
531 - def __repr__(self):
532 return '<MultifdSinkStreamer (%s)>' % self.name
533
534 - def getMaxClients(self):
535 return self.resource.maxclients
536
537 - def get_mime(self):
538 if self.caps: 539 return self.caps.get_structure(0).get_name()
540
541 - def get_content_type(self):
542 mime = self.get_mime() 543 if mime == 'multipart/x-mixed-replace': 544 mime += ";boundary=ThisRandomString" 545 return mime
546
547 - def getUrl(self):
548 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
549
550 - def getStreamData(self):
551 socket = 'flumotion.component.plugs.streamdata.StreamDataProvider' 552 if self.plugs[socket]: 553 plug = self.plugs[socket][-1] 554 return plug.getStreamData() 555 else: 556 return { 557 'protocol': 'HTTP', 558 'description': self.description, 559 'url' : self.getUrl() 560 }
561
562 - def getLoadData(self):
563 """ 564 Return a tuple (deltaadded, deltaremoved, bytes_transferred, 565 current_clients, current_load) of our current bandwidth and user values. 566 The deltas are estimates of how much bitrate is added, removed 567 due to client connections, disconnections, per second. 568 """ 569 # We calculate the estimated clients added/removed per second, then 570 # multiply by the stream bitrate 571 deltaadded, deltaremoved = self.getLoadDeltas() 572 573 bytes_received = self.getBytesReceived() 574 uptime = self.getUptime() 575 bitrate = bytes_received * 8 / uptime 576 577 bytes_sent = self.getBytesSent() 578 clients_connected = self.getClients() 579 current_load = bitrate * clients_connected 580 581 return (deltaadded * bitrate, deltaremoved * bitrate, bytes_sent, 582 clients_connected, current_load)
583
584 - def add_client(self, fd):
585 sink = self.get_element('sink') 586 sink.emit('add', fd)
587
588 - def remove_client(self, fd):
589 sink = self.get_element('sink') 590 sink.emit('remove', fd)
591
592 - def remove_all_clients(self):
593 """ 594 Remove all the clients. 595 596 Returns a deferred fired once all clients have been removed. 597 """ 598 if self.resource: 599 # can be None if we never went happy 600 self.debug("Asking for all clients to be removed") 601 return self.resource.removeAllClients()
602
603 - def update_ui_state(self):
604 """ 605 Update the uiState object. 606 Such updates (through this function) are throttled to a maximum rate, 607 to avoid saturating admin clients with traffic when many clients are 608 connecting/disconnecting. 609 """ 610 def set(k, v): 611 if self.uiState.get(k) != v: 612 self.uiState.set(k, v)
613 now = time.time() 614 self._updateUI_DC = None 615 616 # If we haven't updated too recently, do it immediately. 617 if now - self._lastUpdate >= UI_UPDATE_THROTTLE_PERIOD: 618 if self._updateUI_DC: 619 self._updateUI_DC.cancel() 620 self._updateUI_DC = None 621 622 self._lastUpdate = now 623 # fixme: have updateState just update what changed itself 624 # without the hack above 625 self.updateState(set) 626 elif not self._updateUI_DC: 627 # Otherwise, schedule doing this in a few seconds (unless an update 628 # was already scheduled) 629 self._updateUI_DC = reactor.callLater(UI_UPDATE_THROTTLE_PERIOD, 630 self.update_ui_state)
631
632 - def _client_added_handler(self, sink, fd):
633 self.log('[fd %5d] client_added_handler', fd) 634 Stats.clientAdded(self) 635 self.update_ui_state()
636
637 - def _client_removed_handler(self, sink, fd, reason, stats):
638 self.log('[fd %5d] client_removed_handler, reason %s', fd, reason) 639 if reason.value_name == 'GST_CLIENT_STATUS_ERROR': 640 self.warning('[fd %5d] Client removed because of write error' % fd) 641 642 self.resource.clientRemoved(sink, fd, reason, stats) 643 Stats.clientRemoved(self) 644 self.update_ui_state()
645 646 ### START OF THREAD-AWARE CODE (called from non-reactor threads) 647
648 - def _notify_caps_cb(self, element, pad, param):
649 caps = pad.get_negotiated_caps() 650 if caps == None: 651 return 652 653 caps_str = gstreamer.caps_repr(caps) 654 self.debug('Got caps: %s' % caps_str) 655 656 if not self.caps == None: 657 self.warning('Already had caps: %s, replacing' % caps_str) 658 659 self.debug('Storing caps: %s' % caps_str) 660 self.caps = caps 661 662 reactor.callFromThread(self.update_ui_state)
663 664 # We now use both client-removed and client-fd-removed. We call get-stats 665 # from the first callback ('client-removed'), but don't actually start 666 # removing the client until we get 'client-fd-removed'. This ensures that 667 # there's no window in which multifdsink still knows about the fd, but we've # actually closed it, so we no longer get spurious duplicates. 668 # this can be called from both application and streaming thread !
669 - def _client_removed_cb(self, sink, fd, reason):
670 stats = sink.emit('get-stats', fd) 671 self._pending_removals[fd] = (stats, reason)
672 673 # this can be called from both application and streaming thread !
674 - def _client_fd_removed_cb(self, sink, fd):
675 (stats, reason) = self._pending_removals.pop(fd) 676 677 reactor.callFromThread(self._client_removed_handler, sink, fd, 678 reason, stats)
679 680 ### END OF THREAD-AWARE CODE 681
682 - def do_stop(self):
683 if self._updateCallLaterId: 684 self._updateCallLaterId.cancel() 685 self._updateCallLaterId = None 686 687 if self.httpauth: 688 self.httpauth.stopKeepAlive() 689 690 if self._tport: 691 self._tport.stopListening() 692 693 # After we stop listening (so new connections aren't possible), 694 # disconnect (and thus log) all the old ones. 695 l = [self.remove_all_clients()] 696 697 if self.type == 'slave' and self._pbclient: 698 l.append(self._pbclient.deregisterPath(self.mountPoint)) 699 700 return defer.DeferredList(l)
701
702 - def updatePorterDetails(self, path, username, password):
703 """ 704 Provide a new set of porter login information, for when we're in slave 705 mode and the porter changes. 706 If we're currently connected, this won't disconnect - it'll just change 707 the information so that next time we try and connect we'll use the 708 new ones 709 """ 710 if self.type == 'slave': 711 self._porterUsername = username 712 self._porterPassword = password 713 714 creds = credentials.UsernamePassword(self._porterUsername, 715 self._porterPassword) 716 self._pbclient.startLogin(creds, self.medium) 717 718 # If we've changed paths, we must do some extra work. 719 if path != self._porterPath: 720 self.debug("Changing porter login to use \"%s\"", path) 721 self._porterPath = path 722 self._pbclient.stopTrying() # Stop trying to connect with the 723 # old connector. 724 self._pbclient.resetDelay() 725 reactor.connectWith( 726 fdserver.FDConnector, self._porterPath, 727 self._pbclient, 10, checkPID=False) 728 else: 729 raise errors.WrongStateError( 730 "Can't specify porter details in master mode")
731
732 - def do_pipeline_playing(self):
733 # Override this to not set the component happy; instead do this once 734 # both the pipeline has started AND we've logged in to the porter. 735 if hasattr(self, '_porterDeferred'): 736 d = self._porterDeferred 737 else: 738 d = defer.succeed(None) 739 self.httpauth.scheduleKeepAlive() 740 d.addCallback(lambda res: feedcomponent.ParseLaunchComponent.do_pipeline_playing(self)) 741 return d
742
743 - def do_setup(self):
744 root = resources.HTTPRoot() 745 # TwistedWeb wants the child path to not include the leading / 746 mount = self.mountPoint[1:] 747 root.putChild(mount, self.resource) 748 if self.type == 'slave': 749 # Streamer is slaved to a porter. 750 751 # We have two things we want to do in parallel: 752 # - ParseLaunchComponent.do_start() 753 # - log in to the porter, then register our mountpoint with 754 # the porter. 755 # So, we return a DeferredList with a deferred for each of 756 # these tasks. The second one's a bit tricky: we pass a dummy 757 # deferred to our PorterClientFactory that gets fired once 758 # we've done all of the tasks the first time (it's an 759 # automatically-reconnecting client factory, and we only fire 760 # this deferred the first time) 761 762 self._porterDeferred = d = defer.Deferred() 763 mountpoints = [self.mountPoint] 764 self._pbclient = porterclient.HTTPPorterClientFactory( 765 server.Site(resource=root), mountpoints, d) 766 767 creds = credentials.UsernamePassword(self._porterUsername, 768 self._porterPassword) 769 self._pbclient.startLogin(creds, self.medium) 770 771 self.debug("Starting porter login at \"%s\"", self._porterPath) 772 # This will eventually cause d to fire 773 reactor.connectWith( 774 fdserver.FDConnector, self._porterPath, 775 self._pbclient, 10, checkPID=False) 776 else: 777 # Streamer is standalone. 778 try: 779 self.debug('Listening on %d' % self.port) 780 iface = self.iface or "" 781 self._tport = reactor.listenTCP(self.port, server.Site(resource=root), 782 interface=iface) 783 except error.CannotListenError: 784 t = 'Port %d is not available.' % self.port 785 self.warning(t) 786 m = messages.Error(T_(N_( 787 "Network error: TCP port %d is not available."), self.port)) 788 self.addMessage(m) 789 self.setMood(moods.sad) 790 return defer.fail(errors.ComponentStartHandledError(t))
791