1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import gst
23
24 import time
25
26 from twisted.internet import reactor, defer
27
28 from flumotion.common import log, common
29
30
32 PAD_MONITOR_PROBE_FREQUENCY = 5.0
33 PAD_MONITOR_TIMEOUT = PAD_MONITOR_PROBE_FREQUENCY * 2.5
34
35 - def __init__(self, pad, name, setActive, setInactive):
55
61
64
66 self.check_poller.stop()
67 self.watch_poller.stop()
68
69
70
71
72 d, probe_id = self._probe_id.pop("id", (None, None))
73 if probe_id:
74 self._pad.remove_buffer_probe(probe_id)
75 d.callback(None)
76
78 def probe_cb(pad, buffer):
79 """
80 Periodically scheduled buffer probe, that ensures that we're
81 currently actually having dataflow through our eater
82 elements.
83
84 Called from GStreamer threads.
85
86 @param pad: The gst.Pad srcpad for one eater in this
87 component.
88 @param buffer: A gst.Buffer that has arrived on this pad
89 """
90 self._last_data_time = time.time()
91
92 self.logMessage('buffer probe on %s has timestamp %s', self.name,
93 gst.TIME_ARGS(buffer.timestamp))
94
95 deferred, probe_id = self._probe_id.pop("id", (None, None))
96 if probe_id:
97
98 self._pad.remove_buffer_probe(probe_id)
99
100 reactor.callFromThread(deferred.callback, None)
101
102 reactor.callFromThread(self.watch_poller.run)
103
104 self._first = False
105
106
107 return True
108
109 d = defer.Deferred()
110
111
112 self._probe_id['id'] = (d, self._pad.add_buffer_probe(probe_cb))
113 return d
114
116 self.log('last buffer for %s at %r', self.name, self._last_data_time)
117
118 now = time.time()
119
120 if self._last_data_time < 0:
121
122 self._last_data_time = 0
123 self.setInactive()
124 elif self._last_data_time == 0:
125
126 pass
127 else:
128
129 delta = now - self._last_data_time
130
131 if self._active and delta > self.PAD_MONITOR_TIMEOUT:
132 self.info("No data received on pad %s for > %r seconds, setting "
133 "to hungry", self.name, self.PAD_MONITOR_TIMEOUT)
134 self.setInactive()
135 elif not self._active and delta < self.PAD_MONITOR_TIMEOUT:
136 self.info("Receiving data again on pad %s, flow active",
137 self.name)
138 self.setActive()
139
141 self._active = False
142 self._doSetInactive(self.name)
143
145 self._active = True
146 self._doSetActive(self.name)
147
149 - def __init__(self, pad, name, setActive, setInactive,
150 reconnectEater, *args):
156
158 PadMonitor.setInactive(self)
159
160
161
162
163
164 self._last_data_time = 0
165
166 self._reconnectPoller.start(immediately=True)
167
171
175
176
178 - def __init__(self, setActive, setInactive):
179
180
181 self._doSetActive = setActive
182 self._doSetInactive = setInactive
183 self._wasActive = True
184
186 """
187 Watch for data flow through this pad periodically.
188 If data flow ceases for too long, we turn hungry. If data flow resumes,
189 we return to happy.
190 """
191 def monitorActive(name):
192 self.info('Pad data flow at %s is active', name)
193 if self.isActive() and not self._wasActive:
194
195
196
197
198 self._wasActive = True
199 self._doSetActive()
200
201 def monitorInactive(name):
202 self.info('Pad data flow at %s is inactive', name)
203 if self._wasActive:
204 self._doSetInactive()
205 self._wasActive = False
206
207 assert name not in self
208 monitor = klass(pad, name, monitorActive, monitorInactive, *args)
209 self[monitor.name] = monitor
210 self.info("Added pad monitor %s", monitor.name)
211
213 if name not in self:
214 self.warning("No pad monitor with name %s", name)
215 return
216
217 monitor = self.pop(name)
218 monitor.detach()
219
221 for monitor in self.values():
222 if not monitor.isActive():
223 return False
224 return True
225