1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import time
23
24 import gobject
25 import gst
26
27
28 import socket
29
30 from twisted.internet import reactor, error, defer
31 from twisted.web import server
32 from twisted.cred import credentials
33 from zope.interface import implements
34
35 from flumotion.component import feedcomponent
36 from flumotion.common import bundle, common, gstreamer, errors, pygobject
37 from flumotion.common import messages, netutils, log, interfaces
38
39 from flumotion.twisted import fdserver
40 from flumotion.component.misc.porter import porterclient
41
42
43 from flumotion.component.component import moods
44 from flumotion.common.pygobject import gsignal
45
46 from flumotion.component.consumers.httpstreamer import resources
47 from flumotion.component.base import http
48
49 from flumotion.common.messages import N_
50 T_ = messages.gettexter('flumotion')
51
52 __all__ = ['HTTPMedium', 'MultifdSinkStreamer']
53
54
55 STATS_POLL_INTERVAL = 10
56
57 UI_UPDATE_THROTTLE_PERIOD = 2.0
58
59
60
63 self.sink = sink
64
65 self.no_clients = 0
66 self.clients_added_count = 0
67 self.clients_removed_count = 0
68 self.start_time = time.time()
69
70 self.peak_client_number = 0
71 self.peak_epoch = self.start_time
72 self.load_deltas = [0, 0]
73 self._load_deltas_period = 10
74 self._load_deltas_ongoing = [time.time(), 0, 0]
75 self._currentBitrate = -1
76 self._lastBytesReceived = -1
77
78
79 self.average_client_number = 0
80 self.average_time = self.start_time
81
82 self.hostname = "localhost"
83 self.port = 0
84 self.mountPoint = "/"
85
87
88 now = time.time()
89
90 dt1 = self.average_time - self.start_time
91 dc1 = self.average_client_number
92 dt2 = now - self.average_time
93 dc2 = self.no_clients
94 self.average_time = now
95 if dt1 == 0:
96
97 self.average_client_number = 0
98 else:
99 dt = dt1 + dt2
100 before = (dc1 * dt1) / dt
101 after = dc2 * dt2 / dt
102 self.average_client_number = before + after
103
105 self._updateAverage()
106
107 self.no_clients += 1
108 self.clients_added_count +=1
109
110
111 if self.no_clients >= self.peak_client_number:
112 self.peak_epoch = time.time()
113 self.peak_client_number = self.no_clients
114
116 self._updateAverage()
117 self.no_clients -= 1
118 self.clients_removed_count +=1
119
121 """
122 Periodically, update our statistics on load deltas, and update the
123 UIState with new values for total bytes, bitrate, etc.
124 """
125
126 oldtime, oldadd, oldremove = self._load_deltas_ongoing
127 add, remove = self.clients_added_count, self.clients_removed_count
128 now = time.time()
129 diff = float(now - oldtime)
130
131 self.load_deltas = [(add-oldadd)/diff, (remove-oldremove)/diff]
132 self._load_deltas_ongoing = [now, add, remove]
133
134 bytesReceived = self.getBytesReceived()
135 if self._lastBytesReceived >= 0:
136 self._currentBitrate = ((bytesReceived - self._lastBytesReceived) *
137 8 / STATS_POLL_INTERVAL)
138 self._lastBytesReceived = bytesReceived
139
140 self.update_ui_state()
141
142 self._updateCallLaterId = reactor.callLater(STATS_POLL_INTERVAL,
143 self._updateStats)
144
146 if self._currentBitrate >= 0:
147 return self._currentBitrate
148 else:
149 return self.getBytesReceived() * 8 / self.getUptime()
150
152 return self.sink.get_property('bytes-served')
153
155 return self.sink.get_property('bytes-to-serve')
156
158 return time.time() - self.start_time
159
161 return self.no_clients
162
164 return self.peak_client_number
165
167 return self.peak_epoch
168
170 return self.average_client_number
171
173 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
174
176 return self.load_deltas
177
179 c = self
180
181 bytes_sent = c.getBytesSent()
182 bytes_received = c.getBytesReceived()
183 uptime = c.getUptime()
184
185 set('stream-mime', c.get_mime())
186 set('stream-url', c.getUrl())
187 set('stream-uptime', common.formatTime(uptime))
188 bitspeed = bytes_received * 8 / uptime
189 currentbitrate = self.getCurrentBitrate()
190 set('stream-bitrate', common.formatStorage(bitspeed) + 'bit/s')
191 set('stream-current-bitrate',
192 common.formatStorage(currentbitrate) + 'bit/s')
193 set('stream-totalbytes', common.formatStorage(bytes_received) + 'Byte')
194 set('stream-bitrate-raw', bitspeed)
195 set('stream-totalbytes-raw', bytes_received)
196
197 set('clients-current', str(c.getClients()))
198 set('clients-max', str(c.getMaxClients()))
199 set('clients-peak', str(c.getPeakClients()))
200 set('clients-peak-time', c.getPeakEpoch())
201 set('clients-average', str(int(c.getAverageClients())))
202
203 bitspeed = bytes_sent * 8 / uptime
204 set('consumption-bitrate', common.formatStorage(bitspeed) + 'bit/s')
205 set('consumption-totalbytes', common.formatStorage(bytes_sent) + 'Byte')
206 set('consumption-bitrate-raw', bitspeed)
207 set('consumption-totalbytes-raw', bytes_sent)
208
209 -class HTTPMedium(feedcomponent.FeedComponentMedium):
215
217 """
218 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None.
219 """
220 d = self.callRemote('authenticate', bouncerName, keycard)
221 return d
222
223 - def keepAlive(self, bouncerName, issuerName, ttl):
224 """
225 @rtype: L{twisted.internet.defer.Deferred}
226 """
227 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
228
230 """
231 @rtype: L{twisted.internet.defer.Deferred}
232 """
233 return self.callRemote('removeKeycardId', bouncerName, keycardId)
234
235
238
241
244
247
250
253
254
256 implements(interfaces.IStreamingComponent)
257
258 checkOffset = True
259
260
261 logCategory = 'cons-http'
262
263 pipe_template = 'multifdsink name=sink ' + \
264 'sync=false ' + \
265 'recover-policy=3'
266
267 componentMediumClass = HTTPMedium
268
270 reactor.debug = True
271 self.debug("HTTP streamer initialising")
272
273 self.caps = None
274 self.resource = None
275 self.httpauth = None
276 self.mountPoint = None
277 self.burst_on_connect = False
278
279 self.description = None
280
281 self.type = None
282
283
284 self._pbclient = None
285 self._porterUsername = None
286 self._porterPassword = None
287 self._porterPath = None
288
289
290
291 self.port = None
292
293 self.iface = None
294
295 self._tport = None
296
297 self._updateCallLaterId = None
298 self._lastUpdate = 0
299 self._updateUI_DC = None
300
301 self._pending_removals = {}
302
303 for i in ('stream-mime', 'stream-uptime', 'stream-current-bitrate',
304 'stream-bitrate', 'stream-totalbytes', 'clients-current',
305 'clients-max', 'clients-peak', 'clients-peak-time',
306 'clients-average', 'consumption-bitrate',
307 'consumption-totalbytes', 'stream-bitrate-raw',
308 'stream-totalbytes-raw', 'consumption-bitrate-raw',
309 'consumption-totalbytes-raw', 'stream-url'):
310 self.uiState.addKey(i, None)
311
314
317
319
320
321 self.fixRenamedProperties(props, [
322 ('issuer', 'issuer-class'),
323 ('mount_point', 'mount-point'),
324 ('porter_socket_path', 'porter-socket-path'),
325 ('porter_username', 'porter-username'),
326 ('porter_password', 'porter-password'),
327 ('user_limit', 'client-limit'),
328 ('bandwidth_limit', 'bandwidth-limit'),
329 ('burst_on_connect', 'burst-on-connect'),
330 ('burst_size', 'burst-size'),
331 ])
332
333 if props.get('type', 'master') == 'slave':
334 for k in 'socket-path', 'username', 'password':
335 if not 'porter-' + k in props:
336 raise errors.ConfigError("slave mode, missing required"
337 " property 'porter-%s'" % k)
338
339 if 'burst-size' in props and 'burst-time' in props:
340 raise errors.ConfigError('both burst-size and burst-time '
341 'set, cannot satisfy')
342
343
344 version = gstreamer.get_plugin_version('tcp')
345 if version < (0, 10, 9, 1):
346 m = messages.Error(T_(N_(
347 "Version %s of the '%s' GStreamer plug-in is too old.\n"),
348 ".".join(map(str, version)), 'multifdsink'))
349 m.add(T_(N_("Please upgrade '%s' to version %s."),
350 'gst-plugins-base', '0.10.10'))
351 addMessage(m)
352
354 try:
355 sink.get_property('units-max')
356 return True
357 except TypeError:
358 return False
359
361 if self.burst_on_connect:
362 if self.burst_time and self.time_bursting_supported(sink):
363 self.debug("Configuring burst mode for %f second burst",
364 self.burst_time)
365
366
367 sink.set_property('sync-method', 4)
368 sink.set_property('burst-unit', 2)
369 sink.set_property('burst-value',
370 long(self.burst_time * gst.SECOND))
371
372
373
374
375 sink.set_property('time-min',
376 long((self.burst_time + 5) * gst.SECOND))
377
378 sink.set_property('unit-type', 2)
379 sink.set_property('units-soft-max',
380 long((self.burst_time + 8) * gst.SECOND))
381 sink.set_property('units-max',
382 long((self.burst_time + 10) * gst.SECOND))
383 elif self.burst_size:
384 self.debug("Configuring burst mode for %d kB burst",
385 self.burst_size)
386
387
388
389
390
391 sink.set_property('sync-method', 'burst-keyframe')
392 sink.set_property('burst-unit', 'bytes')
393 sink.set_property('burst-value', self.burst_size * 1024)
394
395
396
397
398 sink.set_property('bytes-min', (self.burst_size + 512) * 1024)
399
400
401
402
403
404
405
406 sink.set_property('buffers-soft-max',
407 (self.burst_size + 1024) / 4)
408 sink.set_property('buffers-max',
409 (self.burst_size + 2048) / 4)
410
411 else:
412
413 self.debug("simple burst-on-connect, setting sync-method 2")
414 sink.set_property('sync-method', 2)
415
416 sink.set_property('buffers-soft-max', 250)
417 sink.set_property('buffers-max', 500)
418 else:
419 self.debug("no burst-on-connect, setting sync-method 0")
420 sink.set_property('sync-method', 0)
421
422 sink.set_property('buffers-soft-max', 250)
423 sink.set_property('buffers-max', 500)
424
530
532 return '<MultifdSinkStreamer (%s)>' % self.name
533
535 return self.resource.maxclients
536
538 if self.caps:
539 return self.caps.get_structure(0).get_name()
540
542 mime = self.get_mime()
543 if mime == 'multipart/x-mixed-replace':
544 mime += ";boundary=ThisRandomString"
545 return mime
546
548 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
549
551 socket = 'flumotion.component.plugs.streamdata.StreamDataProvider'
552 if self.plugs[socket]:
553 plug = self.plugs[socket][-1]
554 return plug.getStreamData()
555 else:
556 return {
557 'protocol': 'HTTP',
558 'description': self.description,
559 'url' : self.getUrl()
560 }
561
563 """
564 Return a tuple (deltaadded, deltaremoved, bytes_transferred,
565 current_clients, current_load) of our current bandwidth and user values.
566 The deltas are estimates of how much bitrate is added, removed
567 due to client connections, disconnections, per second.
568 """
569
570
571 deltaadded, deltaremoved = self.getLoadDeltas()
572
573 bytes_received = self.getBytesReceived()
574 uptime = self.getUptime()
575 bitrate = bytes_received * 8 / uptime
576
577 bytes_sent = self.getBytesSent()
578 clients_connected = self.getClients()
579 current_load = bitrate * clients_connected
580
581 return (deltaadded * bitrate, deltaremoved * bitrate, bytes_sent,
582 clients_connected, current_load)
583
587
591
593 """
594 Remove all the clients.
595
596 Returns a deferred fired once all clients have been removed.
597 """
598 if self.resource:
599
600 self.debug("Asking for all clients to be removed")
601 return self.resource.removeAllClients()
602
604 """
605 Update the uiState object.
606 Such updates (through this function) are throttled to a maximum rate,
607 to avoid saturating admin clients with traffic when many clients are
608 connecting/disconnecting.
609 """
610 def set(k, v):
611 if self.uiState.get(k) != v:
612 self.uiState.set(k, v)
613 now = time.time()
614 self._updateUI_DC = None
615
616
617 if now - self._lastUpdate >= UI_UPDATE_THROTTLE_PERIOD:
618 if self._updateUI_DC:
619 self._updateUI_DC.cancel()
620 self._updateUI_DC = None
621
622 self._lastUpdate = now
623
624
625 self.updateState(set)
626 elif not self._updateUI_DC:
627
628
629 self._updateUI_DC = reactor.callLater(UI_UPDATE_THROTTLE_PERIOD,
630 self.update_ui_state)
631
636
638 self.log('[fd %5d] client_removed_handler, reason %s', fd, reason)
639 if reason.value_name == 'GST_CLIENT_STATUS_ERROR':
640 self.warning('[fd %5d] Client removed because of write error' % fd)
641
642 self.resource.clientRemoved(sink, fd, reason, stats)
643 Stats.clientRemoved(self)
644 self.update_ui_state()
645
646
647
663
664
665
666
667
668
670 stats = sink.emit('get-stats', fd)
671 self._pending_removals[fd] = (stats, reason)
672
673
679
680
681
683 if self._updateCallLaterId:
684 self._updateCallLaterId.cancel()
685 self._updateCallLaterId = None
686
687 if self.httpauth:
688 self.httpauth.stopKeepAlive()
689
690 if self._tport:
691 self._tport.stopListening()
692
693
694
695 l = [self.remove_all_clients()]
696
697 if self.type == 'slave' and self._pbclient:
698 l.append(self._pbclient.deregisterPath(self.mountPoint))
699
700 return defer.DeferredList(l)
701
703 """
704 Provide a new set of porter login information, for when we're in slave
705 mode and the porter changes.
706 If we're currently connected, this won't disconnect - it'll just change
707 the information so that next time we try and connect we'll use the
708 new ones
709 """
710 if self.type == 'slave':
711 self._porterUsername = username
712 self._porterPassword = password
713
714 creds = credentials.UsernamePassword(self._porterUsername,
715 self._porterPassword)
716 self._pbclient.startLogin(creds, self.medium)
717
718
719 if path != self._porterPath:
720 self.debug("Changing porter login to use \"%s\"", path)
721 self._porterPath = path
722 self._pbclient.stopTrying()
723
724 self._pbclient.resetDelay()
725 reactor.connectWith(
726 fdserver.FDConnector, self._porterPath,
727 self._pbclient, 10, checkPID=False)
728 else:
729 raise errors.WrongStateError(
730 "Can't specify porter details in master mode")
731
742
744 root = resources.HTTPRoot()
745
746 mount = self.mountPoint[1:]
747 root.putChild(mount, self.resource)
748 if self.type == 'slave':
749
750
751
752
753
754
755
756
757
758
759
760
761
762 self._porterDeferred = d = defer.Deferred()
763 mountpoints = [self.mountPoint]
764 self._pbclient = porterclient.HTTPPorterClientFactory(
765 server.Site(resource=root), mountpoints, d)
766
767 creds = credentials.UsernamePassword(self._porterUsername,
768 self._porterPassword)
769 self._pbclient.startLogin(creds, self.medium)
770
771 self.debug("Starting porter login at \"%s\"", self._porterPath)
772
773 reactor.connectWith(
774 fdserver.FDConnector, self._porterPath,
775 self._pbclient, 10, checkPID=False)
776 else:
777
778 try:
779 self.debug('Listening on %d' % self.port)
780 iface = self.iface or ""
781 self._tport = reactor.listenTCP(self.port, server.Site(resource=root),
782 interface=iface)
783 except error.CannotListenError:
784 t = 'Port %d is not available.' % self.port
785 self.warning(t)
786 m = messages.Error(T_(N_(
787 "Network error: TCP port %d is not available."), self.port))
788 self.addMessage(m)
789 self.setMood(moods.sad)
790 return defer.fail(errors.ComponentStartHandledError(t))
791