Package flumotion :: Package component :: Module feedcomponent
[hide private]

Source Code for Module flumotion.component.feedcomponent

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Feed components, participating in the stream 
 24  """ 
 25   
 26  import os 
 27   
 28  import gst 
 29  import gst.interfaces 
 30  import gobject 
 31   
 32  from twisted.internet import reactor, defer 
 33  from twisted.spread import pb 
 34  from zope.interface import implements 
 35   
 36  from flumotion.configure import configure 
 37  from flumotion.component import component as basecomponent 
 38  from flumotion.component import feed 
 39  from flumotion.common import common, interfaces, errors, log, pygobject, messages 
 40  from flumotion.common import gstreamer 
 41   
 42  from flumotion.common.planet import moods 
 43  from flumotion.common.pygobject import gsignal 
 44   
 45  from flumotion.common.messages import N_ 
 46  T_ = messages.gettexter('flumotion') 
 47   
48 -class FeedComponentMedium(basecomponent.BaseComponentMedium):
49 """ 50 I am a component-side medium for a FeedComponent to interface with 51 the manager-side ComponentAvatar. 52 """ 53 implements(interfaces.IComponentMedium) 54 logCategory = 'feedcompmed' 55 remoteLogName = 'feedserver' 56
57 - def __init__(self, component):
58 """ 59 @param component: L{flumotion.component.feedcomponent.FeedComponent} 60 """ 61 basecomponent.BaseComponentMedium.__init__(self, component) 62 63 self._feederFeedServer = {} # eaterAlias -> (fullFeedId, host, port) tuple 64 # for remote feeders 65 self._feederPendingConnections = {} # eaterAlias -> cancel thunk 66 self._eaterFeedServer = {} # fullFeedId -> (host, port) tuple 67 # for remote eaters 68 self._eaterPendingConnections = {} # feederName -> cancel thunk 69 self.logName = component.name
70 71 ### Referenceable remote methods which can be called from manager
72 - def remote_attachPadMonitorToFeeder(self, feederName):
73 self.comp.attachPadMonitorToFeeder(feederName)
74
75 - def remote_setGstDebug(self, debug):
76 """ 77 Sets the GStreamer debugging levels based on the passed debug string. 78 79 @since: 0.4.2 80 """ 81 self.debug('Setting GStreamer debug level to %s' % debug) 82 if not debug: 83 return 84 85 for part in debug.split(','): 86 glob = None 87 value = None 88 pair = part.split(':') 89 if len(pair) == 1: 90 # assume only the value 91 value = int(pair[0]) 92 elif len(pair) == 2: 93 glob, value = pair 94 value = int(value) 95 else: 96 self.warning("Cannot parse GStreamer debug setting '%s'." % 97 part) 98 continue 99 100 if glob: 101 try: 102 # value has to be an integer 103 gst.debug_set_threshold_for_name(glob, value) 104 except TypeError: 105 self.warning("Cannot set glob %s to value %s" % ( 106 glob, value)) 107 else: 108 gst.debug_set_default_threshold(value)
109
110 - def remote_eatFrom(self, eaterAlias, fullFeedId, host, port):
111 """ 112 Tell the component the host and port for the FeedServer through which 113 it can connect a local eater to a remote feeder to eat the given 114 fullFeedId. 115 116 Called on by the manager-side ComponentAvatar. 117 """ 118 self._feederFeedServer[eaterAlias] = (fullFeedId, host, port) 119 return self.connectEater(eaterAlias)
120
121 - def _getAuthenticatorForFeed(self, eaterAliasOrFeedName):
122 # The avatarId on the keycards issued by the authenticator will 123 # identify us to the remote component. Attempt to use our 124 # fullFeedId, for debugging porpoises. 125 if hasattr(self.authenticator, 'copy'): 126 tup = common.parseComponentId(self.authenticator.avatarId) 127 flowName, componentName = tup 128 fullFeedId = common.fullFeedId(flowName, componentName, 129 eaterAliasOrFeedName) 130 return self.authenticator.copy(fullFeedId) 131 else: 132 return self.authenticator
133
134 - def connectEater(self, eaterAlias):
135 """ 136 Connect one of the medium's component's eaters to a remote feed. 137 Called by the component, both on initial connection and for 138 reconnecting. 139 140 @returns: (deferred, cancel) pair, where cancel is a thunk that 141 you can call to cancel any pending connection attempt. 142 """ 143 def gotFeed((feedId, fd)): 144 self._feederPendingConnections.pop(eaterAlias, None) 145 self.comp.eatFromFD(eaterAlias, feedId, fd)
146 147 if eaterAlias not in self._feederFeedServer: 148 self.debug("eatFrom() hasn't been called yet for eater %s", 149 eaterAlias) 150 # unclear if this function should have a return value at 151 # all... 152 return defer.succeed(None) 153 154 (fullFeedId, host, port) = self._feederFeedServer[eaterAlias] 155 156 cancel = self._feederPendingConnections.pop(eaterAlias, None) 157 if cancel: 158 self.debug('cancelling previous connection attempt on %s', 159 eaterAlias) 160 cancel() 161 162 client = feed.FeedMedium(logName=self.comp.name) 163 164 d = client.requestFeed(host, port, 165 self._getAuthenticatorForFeed(eaterAlias), 166 fullFeedId) 167 self._feederPendingConnections[eaterAlias] = client.stopConnecting 168 d.addCallback(gotFeed) 169 return d
170
171 - def remote_feedTo(self, feederName, fullFeedId, host, port):
172 """ 173 Tell the component to feed the given feed to the receiving component 174 accessible through the FeedServer on the given host and port. 175 176 Called on by the manager-side ComponentAvatar. 177 """ 178 self._eaterFeedServer[fullFeedId] = (host, port) 179 self.connectFeeder(feederName, fullFeedId)
180
181 - def connectFeeder(self, feederName, fullFeedId):
182 """ 183 Tell the component to feed the given feed to the receiving component 184 accessible through the FeedServer on the given host and port. 185 186 Called on by the manager-side ComponentAvatar. 187 """ 188 def gotFeed((fullFeedId, fd)): 189 self._eaterPendingConnections.pop(feederName, None) 190 self.comp.feedToFD(feederName, fd, os.close, fullFeedId)
191 192 if fullFeedId not in self._eaterFeedServer: 193 self.debug("feedTo() hasn't been called yet for feeder %s", 194 feederName) 195 # unclear if this function should have a return value at 196 # all... 197 return defer.succeed(None) 198 199 host, port = self._eaterFeedServer[fullFeedId] 200 201 # probably should key on feederName as well 202 cancel = self._eaterPendingConnections.pop(fullFeedId, None) 203 if cancel: 204 self.debug('cancelling previous connection attempt on %s', 205 feederName) 206 cancel() 207 208 client = feed.FeedMedium(logName=self.comp.name) 209 210 d = client.sendFeed(host, port, 211 self._getAuthenticatorForFeed(feederName), 212 fullFeedId) 213 self._eaterPendingConnections[feederName] = client.stopConnecting 214 d.addCallback(gotFeed) 215 return d 216
217 - def remote_provideMasterClock(self, port):
218 """ 219 Tells the component to start providing a master clock on the given 220 UDP port. 221 Can only be called if setup() has been called on the component. 222 223 The IP address returned is the local IP the clock is listening on. 224 225 @returns: (ip, port, base_time) 226 @rtype: tuple of (str, int, long) 227 """ 228 self.debug('remote_provideMasterClock(port=%r)' % port) 229 return self.comp.provide_master_clock(port)
230
231 - def remote_getMasterClockInfo(self):
232 """ 233 Return the clock master info created by a previous call to provideMasterClock. 234 235 @returns: (ip, port, base_time) 236 @rtype: tuple of (str, int, long) 237 """ 238 return self.comp.get_master_clock()
239
240 - def remote_setMasterClock(self, ip, port, base_time):
241 return self.comp.set_master_clock(ip, port, base_time)
242
243 - def remote_effect(self, effectName, methodName, *args, **kwargs):
244 """ 245 Invoke the given methodName on the given effectName in this component. 246 The effect should implement effect_(methodName) to receive the call. 247 """ 248 self.debug("calling %s on effect %s" % (methodName, effectName)) 249 if not effectName in self.comp.effects: 250 raise errors.UnknownEffectError(effectName) 251 effect = self.comp.effects[effectName] 252 if not hasattr(effect, "effect_%s" % methodName): 253 raise errors.NoMethodError("%s on effect %s" % (methodName, 254 effectName)) 255 method = getattr(effect, "effect_%s" % methodName) 256 try: 257 result = method(*args, **kwargs) 258 except TypeError: 259 msg = "effect method %s did not accept %s and %s" % ( 260 methodName, args, kwargs) 261 self.debug(msg) 262 raise errors.RemoteRunError(msg) 263 self.debug("effect: result: %r" % result) 264 return result
265 266 from feedcomponent010 import FeedComponent 267 268 FeedComponent.componentMediumClass = FeedComponentMedium 269
270 -class ParseLaunchComponent(FeedComponent):
271 """A component using gst-launch syntax 272 273 @cvar checkTimestamp: whether to check continuity of timestamps for eaters 274 @cvar checkOffset: whether to check continuity of offsets for 275 eaters 276 """ 277 278 DELIMITER = '@' 279 280 # can be set by subclasses 281 checkTimestamp = False 282 checkOffset = False 283 284 # keep these as class variables for the tests 285 FDSRC_TMPL = 'fdsrc name=%(name)s' 286 DEPAY_TMPL = 'gdpdepay name=%(name)s-depay' 287 FEEDER_TMPL = 'gdppay name=%(name)s-pay ! multifdsink sync=false '\ 288 'name=%(name)s buffers-max=500 buffers-soft-max=450 '\ 289 'recover-policy=1' 290 EATER_TMPL = None 291
292 - def init(self):
293 if not gstreamer.get_plugin_version('coreelements'): 294 raise errors.MissingElementError('identity') 295 if not gstreamer.element_factory_has_property('identity', 296 'check-imperfect-timestamp'): 297 self.checkTimestamp = False 298 self.checkOffset = False 299 self.addMessage( 300 messages.Info(T_(N_( 301 "You will get more debugging information " 302 "if you upgrade to GStreamer 0.10.13 or later.")))) 303 304 self.EATER_TMPL = self.FDSRC_TMPL + ' %(queue)s ' + self.DEPAY_TMPL 305 if self.checkTimestamp or self.checkOffset: 306 self.EATER_TMPL += " ! identity name=%(name)s-identity silent=TRUE" 307 if self.checkTimestamp: 308 self.EATER_TMPL += " check-imperfect-timestamp=1" 309 if self.checkOffset: 310 self.EATER_TMPL += " check-imperfect-offset=1"
311 312 ### FeedComponent interface implementations
313 - def create_pipeline(self):
314 try: 315 unparsed = self.get_pipeline_string(self.config['properties']) 316 except errors.MissingElementError, e: 317 m = messages.Error(T_(N_( 318 "The worker does not have the '%s' element installed.\n" 319 "Please install the necessary plug-in and restart " 320 "the component.\n"), e.args[0])) 321 self.state.append('messages', m) 322 raise errors.ComponentSetupHandledError(e) 323 324 self.pipeline_string = self.parse_pipeline(unparsed) 325 326 try: 327 pipeline = gst.parse_launch(self.pipeline_string) 328 except gobject.GError, e: 329 self.warning('Could not parse pipeline: %s' % e.message) 330 m = messages.Error(T_(N_( 331 "GStreamer error: could not parse component pipeline.")), 332 debug=e.message) 333 self.state.append('messages', m) 334 raise errors.PipelineParseError(e.message) 335 336 return pipeline
337
338 - def set_pipeline(self, pipeline):
339 FeedComponent.set_pipeline(self, pipeline) 340 if self.checkTimestamp or self.checkOffset: 341 watchElements = dict([(e.elementName + '-identity' , e) 342 for e in self.eaters.values()]) 343 self.install_eater_continuity_watch(watchElements) 344 self.configure_pipeline(self.pipeline, self.config['properties'])
345 346 ### ParseLaunchComponent interface for subclasses
347 - def get_pipeline_string(self, properties):
348 """ 349 Method that must be implemented by subclasses to produce the 350 gstparse string for the component's pipeline. Subclasses should 351 not chain up; this method raises a NotImplemented error. 352 353 Returns: a new pipeline string representation. 354 """ 355 raise NotImplementedError('subclasses should implement ' 356 'get_pipeline_string')
357
358 - def configure_pipeline(self, pipeline, properties):
359 """ 360 Method that can be implemented by subclasses if they wish to 361 interact with the pipeline after it has been created and set 362 on the component. 363 364 This could include attaching signals and bus handlers. 365 """ 366 pass
367 368 ### private methods
369 - def add_default_eater_feeder(self, pipeline):
370 if len(self.eaters) == 1: 371 eater = 'eater:' + self.eaters.keys()[0] 372 if eater not in pipeline: 373 pipeline = '@' + eater + '@ ! ' + pipeline 374 if len(self.feeders) == 1: 375 feeder = 'feeder:' + self.feeders.keys()[0] 376 if feeder not in pipeline: 377 pipeline = pipeline + ' ! @' + feeder + '@' 378 return pipeline
379
380 - def parse_tmpl(self, pipeline, templatizers):
381 """ 382 Expand the given pipeline string representation by substituting 383 blocks between '@' with a filled-in template. 384 385 @param pipeline: a pipeline string representation with variables 386 @param templatizers: A dict of prefix => procedure. Template 387 blocks in the pipeline will be replaced 388 with the result of calling the procedure 389 with what is left of the template after 390 taking off the prefix. 391 @returns: a new pipeline string representation. 392 """ 393 assert pipeline != '' 394 395 # verify the template has an even number of delimiters 396 if pipeline.count(self.DELIMITER) % 2 != 0: 397 raise TypeError("'%s' contains an odd number of '%s'" 398 % (pipeline, self.DELIMITER)) 399 400 out = [] 401 for i, block in enumerate(pipeline.split(self.DELIMITER)): 402 # when splitting, the even-indexed members will remain, and 403 # the odd-indexed members are the blocks to be substituted 404 if i % 2 == 0: 405 out.append(block) 406 else: 407 block = block.strip() 408 try: 409 pos = block.index(':') 410 except ValueError: 411 raise TypeError("Template %r has no colon" % (block,)) 412 prefix = block[:pos+1] 413 if prefix not in templatizers: 414 raise TypeError("Template %r has invalid prefix %r" 415 % (block, prefix)) 416 out.append(templatizers[prefix](block[pos+1:])) 417 return ''.join(out)
418
419 - def parse_pipeline(self, pipeline):
420 pipeline = " ".join(pipeline.split()) 421 self.debug('Creating pipeline, template is %s', pipeline) 422 423 if pipeline == '' and not self.eaters: 424 raise TypeError, "Need a pipeline or a eater" 425 426 if pipeline == '': 427 # code of dubious value 428 assert self.eaters 429 pipeline = 'fakesink signal-handoffs=1 silent=1 name=sink' 430 431 pipeline = self.add_default_eater_feeder(pipeline) 432 pipeline = self.parse_tmpl(pipeline, 433 {'eater:': self.get_eater_template, 434 'feeder:': self.get_feeder_template}) 435 436 self.debug('pipeline is %s', pipeline) 437 assert self.DELIMITER not in pipeline 438 439 return pipeline
440
441 - def get_eater_template(self, eaterAlias):
442 queue = self.get_queue_string(eaterAlias) 443 elementName = self.eaters[eaterAlias].elementName 444 445 return self.EATER_TMPL % {'name': elementName, 'queue': queue}
446
447 - def get_feeder_template(self, feederName):
448 elementName = self.feeders[feederName].elementName 449 return self.FEEDER_TMPL % {'name': elementName}
450
451 - def get_queue_string(self, eaterAlias):
452 """ 453 Return a parse-launch string to join the fdsrc eater element and 454 the depayer, for example '!' or '! queue !'. The string may have 455 no format strings. 456 """ 457 return '!'
458
459 -class Effect(log.Loggable):
460 """ 461 I am a part of a feed component for a specific group 462 of functionality. 463 464 @ivar name: name of the effect 465 @type name: string 466 @ivar component: component owning the effect 467 @type component: L{FeedComponent} 468 """ 469 logCategory = "effect" 470
471 - def __init__(self, name):
472 """ 473 @param name: the name of the effect 474 """ 475 self.name = name 476 self.setComponent(None)
477
478 - def setComponent(self, component):
479 """ 480 Set the given component as the effect's owner. 481 482 @param component: the component to set as an owner of this effect 483 @type component: L{FeedComponent} 484 """ 485 self.component = component 486 self.setUIState(component and component.uiState or None)
487
488 - def setUIState(self, state):
489 """ 490 Set the given UI state on the effect. This method is ideal for 491 adding keys to the UI state. 492 493 @param state: the UI state for the component to use. 494 @type state: L{flumotion.common.componentui.WorkerComponentUIState} 495 """ 496 self.uiState = state
497
498 - def getComponent(self):
499 """ 500 Get the component owning this effect. 501 502 @rtype: L{FeedComponent} 503 """ 504 return self.component
505
506 -class MultiInputParseLaunchComponent(ParseLaunchComponent):
507 """ 508 This class provides for multi-input ParseLaunchComponents, such as muxers, 509 with a queue attached to each input. 510 """ 511 QUEUE_SIZE_BUFFERS = 16 512
513 - def get_muxer_string(self, properties):
514 """ 515 Return a gst-parse description of the muxer, which must be named 'muxer' 516 """ 517 raise errors.NotImplementedError("Implement in a subclass")
518
519 - def get_queue_string(self, eaterAlias):
520 name = self.eaters[eaterAlias].elementName 521 return ("! queue name=%s-queue max-size-buffers=%d !" 522 % (name, self.QUEUE_SIZE_BUFFERS))
523
524 - def get_pipeline_string(self, properties):
525 eaters = self.config.get('eater', {}) 526 sources = self.config.get('source', []) 527 if eaters == {} and sources != []: 528 # for upgrade without manager restart 529 feeds = [] 530 for feed in sources: 531 if not ':' in feed: 532 feed = '%s:default' % feed 533 feeds.append(feed) 534 eaters = { 'default': [(x, 'default') for x in feeds] } 535 536 pipeline = self.get_muxer_string(properties) + ' ' 537 for e in eaters: 538 for feed, alias in eaters[e]: 539 pipeline += '@ eater:%s @ ! muxer. ' % alias 540 541 pipeline += 'muxer.' 542 543 return pipeline
544
545 - def unblock_eater(self, eaterAlias):
546 # Firstly, ensure that any push in progress is guaranteed to return, 547 # by temporarily enlarging the queue 548 queuename = self.eaters[eaterAlias].elementName + '-queue' 549 queue = self.pipeline.get_by_name(queuename) 550 551 size = queue.get_property("max-size-buffers") 552 queue.set_property("max-size-buffers", size + 1) 553 554 # So, now it's guaranteed to return. However, we want to return the 555 # queue size to its original value. Doing this in a thread-safe manner 556 # is rather tricky... 557 def _block_cb(pad, blocked): 558 # This is called from streaming threads, but we don't do anything 559 # here so it's safe. 560 pass
561 def _underrun_cb(element): 562 # Called from a streaming thread. The queue element does not hold 563 # the queue lock when this is called, so we block our sinkpad, 564 # then re-check the current level. 565 pad = element.get_pad("sink") 566 pad.set_blocked_async(True, _block_cb) 567 level = element.get_property("current-level-buffers") 568 if level < self.QUEUE_SIZE_BUFFERS: 569 element.set_property('max-size-buffers', 570 self.QUEUE_SIZE_BUFFERS) 571 element.disconnect(signalid) 572 pad.set_blocked_async(False, _block_cb)
573 574 signalid = queue.connect("underrun", _underrun_cb) 575