1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 import time
24
25 import gst
26
27 from twisted.internet import reactor
28
29 from flumotion.common import componentui
30
31
33 """
34 This class groups feeder-related information as used by a Feed Component.
35
36 @ivar feederName: name of the feeder
37 @ivar uiState: the serializable UI State for this feeder
38 """
40 self.feederName = feederName
41 self.elementName = 'feeder:' + feederName
42 self.payName = self.elementName + '-pay'
43 self.uiState = componentui.WorkerComponentUIState()
44 self.uiState.addKey('feederName')
45 self.uiState.set('feederName', feederName)
46 self.uiState.addListKey('clients')
47 self._fdToClient = {}
48 self._clients = {}
49
51 return ('<Feeder %s (%d client(s))>'
52 % (self.feederName, len(self._clients)))
53
55 """
56 The given client has connected on the given file descriptor, and is
57 being added to multifdsink. This is called solely from the reactor
58 thread.
59
60 @param clientId: id of the client of the feeder
61 @param fd: file descriptor representing the client
62 @param cleanup: callable to be called when the given fd is removed
63 """
64 if clientId not in self._clients:
65
66 client = FeederClient(clientId)
67 self._clients[clientId] = client
68 self.uiState.append('clients', client.uiState)
69
70 client = self._clients[clientId]
71 self._fdToClient[fd] = (client, cleanup)
72
73 client.connected(fd)
74
75 return client
76
78 """
79 The client has been entirely removed from multifdsink, and we may
80 now close its file descriptor.
81 The client object stays around so we can track over multiple
82 connections.
83
84 Called from GStreamer threads.
85
86 @type fd: file descriptor
87 """
88 (client, cleanup) = self._fdToClient.pop(fd)
89 client.disconnected(fd=fd)
90
91
92
93
94 reactor.callFromThread(cleanup, fd)
95
97 """
98 @rtype: list of all L{FeederClient}s ever seen, including currently
99 disconnected clients
100 """
101 return self._clients.values()
102
104 """
105 This class groups information related to the client of a feeder.
106 The client is identified by an id.
107 The information remains valid for the lifetime of the feeder, so it
108 can track reconnects of the client.
109
110 @ivar clientId: id of the client of the feeder
111 @ivar fd: file descriptor the client is currently using, or None.
112 """
114 self.uiState = componentui.WorkerComponentUIState()
115 self.uiState.addKey('clientId', clientId)
116 self.fd = None
117 self.uiState.addKey('fd', None)
118
119
120
121
122 for key in (
123 'bytesReadCurrent',
124 'bytesReadTotal',
125 'reconnects',
126 'lastConnect',
127 'lastDisconnect',
128 'lastActivity',
129 ):
130 self.uiState.addKey(key, 0)
131
132 for key in (
133 'buffersDroppedCurrent',
134 'buffersDroppedTotal',
135 ):
136 self.uiState.addKey(key, None)
137
138
139 self._buffersDroppedBefore = 0
140 self._bytesReadBefore = 0
141
143 """
144 @type stats: list
145 """
146 bytesSent = stats[0]
147
148
149
150 timeLastActivity = float(stats[4]) / gst.SECOND
151 if len(stats) > 5:
152
153 buffersDropped = stats[5]
154 else:
155
156
157 buffersDropped = 0
158
159 self.uiState.set('bytesReadCurrent', bytesSent)
160 self.uiState.set('buffersDroppedCurrent', buffersDropped)
161 self.uiState.set('bytesReadTotal', self._bytesReadBefore + bytesSent)
162 self.uiState.set('lastActivity', timeLastActivity)
163 if buffersDropped is not None:
164 self.uiState.set('buffersDroppedTotal',
165 self._buffersDroppedBefore + buffersDropped)
166
168 """
169 The client has connected on this fd.
170 Update related stats.
171
172 Called only from the reactor thread.
173 """
174 if not when:
175 when = time.time()
176
177 if self.fd:
178
179
180
181 self._updateUIStateForDisconnect(self.fd, when)
182
183 self.fd = fd
184 self.uiState.set('fd', fd)
185 self.uiState.set('lastConnect', when)
186 self.uiState.set('reconnects', self.uiState.get('reconnects', 0) + 1)
187
189 if self.fd == fd:
190 self.fd = None
191 self.uiState.set('fd', None)
192 self.uiState.set('lastDisconnect', when)
193
194
195 self._bytesReadBefore += self.uiState.get('bytesReadCurrent')
196 self.uiState.set('bytesReadCurrent', 0)
197 if self.uiState.get('buffersDroppedCurrent') is not None:
198 self._buffersDroppedBefore += self.uiState.get(
199 'buffersDroppedCurrent')
200 self.uiState.set('buffersDroppedCurrent', 0)
201
203 """
204 The client has disconnected.
205 Update related stats.
206
207 Called from GStreamer threads.
208 """
209 if self.fd != fd:
210
211
212 return
213
214 if not when:
215 when = time.time()
216
217 reactor.callFromThread(self._updateUIStateForDisconnect, fd,
218 when)
219