1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 Feed components, participating in the stream
24 """
25
26 import os
27
28 import gst
29 import gst.interfaces
30 import gobject
31
32 from twisted.internet import reactor, defer
33 from twisted.spread import pb
34 from zope.interface import implements
35
36 from flumotion.configure import configure
37 from flumotion.component import component as basecomponent
38 from flumotion.component import feed
39 from flumotion.common import common, interfaces, errors, log, pygobject, messages
40 from flumotion.common import gstreamer
41
42 from flumotion.common.planet import moods
43 from flumotion.common.pygobject import gsignal
44
45 from flumotion.common.messages import N_
46 T_ = messages.gettexter('flumotion')
47
49 """
50 I am a component-side medium for a FeedComponent to interface with
51 the manager-side ComponentAvatar.
52 """
53 implements(interfaces.IComponentMedium)
54 logCategory = 'feedcompmed'
55 remoteLogName = 'feedserver'
56
58 """
59 @param component: L{flumotion.component.feedcomponent.FeedComponent}
60 """
61 basecomponent.BaseComponentMedium.__init__(self, component)
62
63 self._feederFeedServer = {}
64
65 self._feederPendingConnections = {}
66 self._eaterFeedServer = {}
67
68 self._eaterPendingConnections = {}
69 self.logName = component.name
70
71
74
76 """
77 Sets the GStreamer debugging levels based on the passed debug string.
78
79 @since: 0.4.2
80 """
81 self.debug('Setting GStreamer debug level to %s' % debug)
82 if not debug:
83 return
84
85 for part in debug.split(','):
86 glob = None
87 value = None
88 pair = part.split(':')
89 if len(pair) == 1:
90
91 value = int(pair[0])
92 elif len(pair) == 2:
93 glob, value = pair
94 value = int(value)
95 else:
96 self.warning("Cannot parse GStreamer debug setting '%s'." %
97 part)
98 continue
99
100 if glob:
101 try:
102
103 gst.debug_set_threshold_for_name(glob, value)
104 except TypeError:
105 self.warning("Cannot set glob %s to value %s" % (
106 glob, value))
107 else:
108 gst.debug_set_default_threshold(value)
109
111 """
112 Tell the component the host and port for the FeedServer through which
113 it can connect a local eater to a remote feeder to eat the given
114 fullFeedId.
115
116 Called on by the manager-side ComponentAvatar.
117 """
118 self._feederFeedServer[eaterAlias] = (fullFeedId, host, port)
119 return self.connectEater(eaterAlias)
120
133
135 """
136 Connect one of the medium's component's eaters to a remote feed.
137 Called by the component, both on initial connection and for
138 reconnecting.
139
140 @returns: (deferred, cancel) pair, where cancel is a thunk that
141 you can call to cancel any pending connection attempt.
142 """
143 def gotFeed((feedId, fd)):
144 self._feederPendingConnections.pop(eaterAlias, None)
145 self.comp.eatFromFD(eaterAlias, feedId, fd)
146
147 if eaterAlias not in self._feederFeedServer:
148 self.debug("eatFrom() hasn't been called yet for eater %s",
149 eaterAlias)
150
151
152 return defer.succeed(None)
153
154 (fullFeedId, host, port) = self._feederFeedServer[eaterAlias]
155
156 cancel = self._feederPendingConnections.pop(eaterAlias, None)
157 if cancel:
158 self.debug('cancelling previous connection attempt on %s',
159 eaterAlias)
160 cancel()
161
162 client = feed.FeedMedium(logName=self.comp.name)
163
164 d = client.requestFeed(host, port,
165 self._getAuthenticatorForFeed(eaterAlias),
166 fullFeedId)
167 self._feederPendingConnections[eaterAlias] = client.stopConnecting
168 d.addCallback(gotFeed)
169 return d
170
172 """
173 Tell the component to feed the given feed to the receiving component
174 accessible through the FeedServer on the given host and port.
175
176 Called on by the manager-side ComponentAvatar.
177 """
178 self._eaterFeedServer[fullFeedId] = (host, port)
179 self.connectFeeder(feederName, fullFeedId)
180
182 """
183 Tell the component to feed the given feed to the receiving component
184 accessible through the FeedServer on the given host and port.
185
186 Called on by the manager-side ComponentAvatar.
187 """
188 def gotFeed((fullFeedId, fd)):
189 self._eaterPendingConnections.pop(feederName, None)
190 self.comp.feedToFD(feederName, fd, os.close, fullFeedId)
191
192 if fullFeedId not in self._eaterFeedServer:
193 self.debug("feedTo() hasn't been called yet for feeder %s",
194 feederName)
195
196
197 return defer.succeed(None)
198
199 host, port = self._eaterFeedServer[fullFeedId]
200
201
202 cancel = self._eaterPendingConnections.pop(fullFeedId, None)
203 if cancel:
204 self.debug('cancelling previous connection attempt on %s',
205 feederName)
206 cancel()
207
208 client = feed.FeedMedium(logName=self.comp.name)
209
210 d = client.sendFeed(host, port,
211 self._getAuthenticatorForFeed(feederName),
212 fullFeedId)
213 self._eaterPendingConnections[feederName] = client.stopConnecting
214 d.addCallback(gotFeed)
215 return d
216
218 """
219 Tells the component to start providing a master clock on the given
220 UDP port.
221 Can only be called if setup() has been called on the component.
222
223 The IP address returned is the local IP the clock is listening on.
224
225 @returns: (ip, port, base_time)
226 @rtype: tuple of (str, int, long)
227 """
228 self.debug('remote_provideMasterClock(port=%r)' % port)
229 return self.comp.provide_master_clock(port)
230
232 """
233 Return the clock master info created by a previous call to provideMasterClock.
234
235 @returns: (ip, port, base_time)
236 @rtype: tuple of (str, int, long)
237 """
238 return self.comp.get_master_clock()
239
242
243 - def remote_effect(self, effectName, methodName, *args, **kwargs):
244 """
245 Invoke the given methodName on the given effectName in this component.
246 The effect should implement effect_(methodName) to receive the call.
247 """
248 self.debug("calling %s on effect %s" % (methodName, effectName))
249 if not effectName in self.comp.effects:
250 raise errors.UnknownEffectError(effectName)
251 effect = self.comp.effects[effectName]
252 if not hasattr(effect, "effect_%s" % methodName):
253 raise errors.NoMethodError("%s on effect %s" % (methodName,
254 effectName))
255 method = getattr(effect, "effect_%s" % methodName)
256 try:
257 result = method(*args, **kwargs)
258 except TypeError:
259 msg = "effect method %s did not accept %s and %s" % (
260 methodName, args, kwargs)
261 self.debug(msg)
262 raise errors.RemoteRunError(msg)
263 self.debug("effect: result: %r" % result)
264 return result
265
266 from feedcomponent010 import FeedComponent
267
268 FeedComponent.componentMediumClass = FeedComponentMedium
269
271 """A component using gst-launch syntax
272
273 @cvar checkTimestamp: whether to check continuity of timestamps for eaters
274 @cvar checkOffset: whether to check continuity of offsets for
275 eaters
276 """
277
278 DELIMITER = '@'
279
280
281 checkTimestamp = False
282 checkOffset = False
283
284
285 FDSRC_TMPL = 'fdsrc name=%(name)s'
286 DEPAY_TMPL = 'gdpdepay name=%(name)s-depay'
287 FEEDER_TMPL = 'gdppay name=%(name)s-pay ! multifdsink sync=false '\
288 'name=%(name)s buffers-max=500 buffers-soft-max=450 '\
289 'recover-policy=1'
290 EATER_TMPL = None
291
311
312
337
345
346
348 """
349 Method that must be implemented by subclasses to produce the
350 gstparse string for the component's pipeline. Subclasses should
351 not chain up; this method raises a NotImplemented error.
352
353 Returns: a new pipeline string representation.
354 """
355 raise NotImplementedError('subclasses should implement '
356 'get_pipeline_string')
357
367
368
379
381 """
382 Expand the given pipeline string representation by substituting
383 blocks between '@' with a filled-in template.
384
385 @param pipeline: a pipeline string representation with variables
386 @param templatizers: A dict of prefix => procedure. Template
387 blocks in the pipeline will be replaced
388 with the result of calling the procedure
389 with what is left of the template after
390 taking off the prefix.
391 @returns: a new pipeline string representation.
392 """
393 assert pipeline != ''
394
395
396 if pipeline.count(self.DELIMITER) % 2 != 0:
397 raise TypeError("'%s' contains an odd number of '%s'"
398 % (pipeline, self.DELIMITER))
399
400 out = []
401 for i, block in enumerate(pipeline.split(self.DELIMITER)):
402
403
404 if i % 2 == 0:
405 out.append(block)
406 else:
407 block = block.strip()
408 try:
409 pos = block.index(':')
410 except ValueError:
411 raise TypeError("Template %r has no colon" % (block,))
412 prefix = block[:pos+1]
413 if prefix not in templatizers:
414 raise TypeError("Template %r has invalid prefix %r"
415 % (block, prefix))
416 out.append(templatizers[prefix](block[pos+1:]))
417 return ''.join(out)
418
440
442 queue = self.get_queue_string(eaterAlias)
443 elementName = self.eaters[eaterAlias].elementName
444
445 return self.EATER_TMPL % {'name': elementName, 'queue': queue}
446
448 elementName = self.feeders[feederName].elementName
449 return self.FEEDER_TMPL % {'name': elementName}
450
452 """
453 Return a parse-launch string to join the fdsrc eater element and
454 the depayer, for example '!' or '! queue !'. The string may have
455 no format strings.
456 """
457 return '!'
458
460 """
461 I am a part of a feed component for a specific group
462 of functionality.
463
464 @ivar name: name of the effect
465 @type name: string
466 @ivar component: component owning the effect
467 @type component: L{FeedComponent}
468 """
469 logCategory = "effect"
470
472 """
473 @param name: the name of the effect
474 """
475 self.name = name
476 self.setComponent(None)
477
479 """
480 Set the given component as the effect's owner.
481
482 @param component: the component to set as an owner of this effect
483 @type component: L{FeedComponent}
484 """
485 self.component = component
486 self.setUIState(component and component.uiState or None)
487
489 """
490 Set the given UI state on the effect. This method is ideal for
491 adding keys to the UI state.
492
493 @param state: the UI state for the component to use.
494 @type state: L{flumotion.common.componentui.WorkerComponentUIState}
495 """
496 self.uiState = state
497
499 """
500 Get the component owning this effect.
501
502 @rtype: L{FeedComponent}
503 """
504 return self.component
505
573
574 signalid = queue.connect("underrun", _underrun_cb)
575