Package flumotion :: Package component :: Module feeder
[hide private]

Source Code for Module flumotion.component.feeder

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22   
 23  import time 
 24   
 25  import gst 
 26   
 27  from twisted.internet import reactor 
 28   
 29  from flumotion.common import componentui 
 30   
 31   
32 -class Feeder:
33 """ 34 This class groups feeder-related information as used by a Feed Component. 35 36 @ivar feederName: name of the feeder 37 @ivar uiState: the serializable UI State for this feeder 38 """
39 - def __init__(self, feederName):
40 self.feederName = feederName 41 self.elementName = 'feeder:' + feederName 42 self.payName = self.elementName + '-pay' 43 self.uiState = componentui.WorkerComponentUIState() 44 self.uiState.addKey('feederName') 45 self.uiState.set('feederName', feederName) 46 self.uiState.addListKey('clients') 47 self._fdToClient = {} # fd -> (FeederClient, cleanupfunc) 48 self._clients = {} # id -> FeederClient
49
50 - def __repr__(self):
51 return ('<Feeder %s (%d client(s))>' 52 % (self.feederName, len(self._clients)))
53
54 - def clientConnected(self, clientId, fd, cleanup):
55 """ 56 The given client has connected on the given file descriptor, and is 57 being added to multifdsink. This is called solely from the reactor 58 thread. 59 60 @param clientId: id of the client of the feeder 61 @param fd: file descriptor representing the client 62 @param cleanup: callable to be called when the given fd is removed 63 """ 64 if clientId not in self._clients: 65 # first time we see this client, create an object 66 client = FeederClient(clientId) 67 self._clients[clientId] = client 68 self.uiState.append('clients', client.uiState) 69 70 client = self._clients[clientId] 71 self._fdToClient[fd] = (client, cleanup) 72 73 client.connected(fd) 74 75 return client
76
77 - def clientDisconnected(self, fd):
78 """ 79 The client has been entirely removed from multifdsink, and we may 80 now close its file descriptor. 81 The client object stays around so we can track over multiple 82 connections. 83 84 Called from GStreamer threads. 85 86 @type fd: file descriptor 87 """ 88 (client, cleanup) = self._fdToClient.pop(fd) 89 client.disconnected(fd=fd) 90 91 # To avoid races between this thread (a GStreamer thread) closing the 92 # FD, and the reactor thread reusing this FD, we only actually perform 93 # the close in the reactor thread. 94 reactor.callFromThread(cleanup, fd)
95
96 - def getClients(self):
97 """ 98 @rtype: list of all L{FeederClient}s ever seen, including currently 99 disconnected clients 100 """ 101 return self._clients.values()
102
103 -class FeederClient:
104 """ 105 This class groups information related to the client of a feeder. 106 The client is identified by an id. 107 The information remains valid for the lifetime of the feeder, so it 108 can track reconnects of the client. 109 110 @ivar clientId: id of the client of the feeder 111 @ivar fd: file descriptor the client is currently using, or None. 112 """
113 - def __init__(self, clientId):
114 self.uiState = componentui.WorkerComponentUIState() 115 self.uiState.addKey('clientId', clientId) 116 self.fd = None 117 self.uiState.addKey('fd', None) 118 119 # these values can be set to None, which would mean 120 # Unknown, not supported 121 # these are supported 122 for key in ( 123 'bytesReadCurrent', # bytes read over current connection 124 'bytesReadTotal', # bytes read over all connections 125 'reconnects', # number of connections made by this client 126 'lastConnect', # last client connection, in epoch seconds 127 'lastDisconnect', # last client disconnect, in epoch seconds 128 'lastActivity', # last time client read or connected 129 ): 130 self.uiState.addKey(key, 0) 131 # these are possibly unsupported 132 for key in ( 133 'buffersDroppedCurrent', # buffers dropped over current connection 134 'buffersDroppedTotal', # buffers dropped over all connections 135 ): 136 self.uiState.addKey(key, None) 137 138 # internal state allowing us to track global numbers 139 self._buffersDroppedBefore = 0 140 self._bytesReadBefore = 0
141
142 - def setStats(self, stats):
143 """ 144 @type stats: list 145 """ 146 bytesSent = stats[0] 147 #timeAdded = stats[1] 148 #timeRemoved = stats[2] 149 #timeActive = stats[3] 150 timeLastActivity = float(stats[4]) / gst.SECOND 151 if len(stats) > 5: 152 # added in gst-plugins-base 0.10.11 153 buffersDropped = stats[5] 154 else: 155 # We don't know, but we cannot use None 156 # since that would break integer addition below 157 buffersDropped = 0 158 159 self.uiState.set('bytesReadCurrent', bytesSent) 160 self.uiState.set('buffersDroppedCurrent', buffersDropped) 161 self.uiState.set('bytesReadTotal', self._bytesReadBefore + bytesSent) 162 self.uiState.set('lastActivity', timeLastActivity) 163 if buffersDropped is not None: 164 self.uiState.set('buffersDroppedTotal', 165 self._buffersDroppedBefore + buffersDropped)
166
167 - def connected(self, fd, when=None):
168 """ 169 The client has connected on this fd. 170 Update related stats. 171 172 Called only from the reactor thread. 173 """ 174 if not when: 175 when = time.time() 176 177 if self.fd: 178 # It's normal to receive a reconnection before we notice 179 # that an old connection has been closed. Perform the 180 # disconnection logic for the old FD if necessary. See #591. 181 self._updateUIStateForDisconnect(self.fd, when) 182 183 self.fd = fd 184 self.uiState.set('fd', fd) 185 self.uiState.set('lastConnect', when) 186 self.uiState.set('reconnects', self.uiState.get('reconnects', 0) + 1)
187
188 - def _updateUIStateForDisconnect(self, fd, when):
189 if self.fd == fd: 190 self.fd = None 191 self.uiState.set('fd', None) 192 self.uiState.set('lastDisconnect', when) 193 194 # update our internal counters and reset current counters to 0 195 self._bytesReadBefore += self.uiState.get('bytesReadCurrent') 196 self.uiState.set('bytesReadCurrent', 0) 197 if self.uiState.get('buffersDroppedCurrent') is not None: 198 self._buffersDroppedBefore += self.uiState.get( 199 'buffersDroppedCurrent') 200 self.uiState.set('buffersDroppedCurrent', 0)
201
202 - def disconnected(self, when=None, fd=None):
203 """ 204 The client has disconnected. 205 Update related stats. 206 207 Called from GStreamer threads. 208 """ 209 if self.fd != fd: 210 # assume that connected() already called 211 # _updateUIStateForDisconnect for us 212 return 213 214 if not when: 215 when = time.time() 216 217 reactor.callFromThread(self._updateUIStateForDisconnect, fd, 218 when)
219