Package flumotion :: Package component :: Package producers :: Package looper :: Module looper
[hide private]

Source Code for Module flumotion.component.producers.looper.looper

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import gst 
 23  import gobject 
 24   
 25  from flumotion.common import errors, messages 
 26  from flumotion.component import feedcomponent 
 27   
 28  from flumotion.common.messages import N_ 
 29  T_ = messages.gettexter('flumotion') 
 30   
31 -class LooperMedium(feedcomponent.FeedComponentMedium):
32 - def __init__(self, comp):
34
35 - def remote_gimme5(self, text):
36 return self.comp.do_seek()
37
38 - def remote_getNbIterations(self):
39 return self.comp.nbiterations
40
42 return self.comp.fileinformation
43 44 45 # How to start the first segment: 46 # 1) Make your pipeline, but don't link the sinks 47 # 2) Block the source pads of what would be the sinks' peers 48 # 3) When both block functions fire, link the pads, then do a segment seek 49 # 4) Then you can unblock pads and the sinks will receive exactly one 50 # new segment with all gst versions 51 # 52 # To loop a segment, when you get the segment_done message 53 # asynchronously, just do a new segment seek. 54
55 -class Looper(feedcomponent.ParseLaunchComponent):
56 57 componentMediumClass = LooperMedium 58
59 - def init(self):
60 self.initial_seek = False 61 self.nbiterations = 0 62 self.fileinformation = None 63 self.timeoutid = 0 64 self.pads_awaiting_block = [] 65 self.pads_to_link = [] 66 self.bus = None 67 self.uiState.addKey('info-location', '') 68 self.uiState.addKey('info-duration', 0) 69 self.uiState.addKey('info-audio', None) 70 self.uiState.addKey('info-video', None) 71 self.uiState.addKey('num-iterations', 0) 72 self.uiState.addKey('position', 0)
73
74 - def do_check(self):
75 def on_result(result): 76 for m in result.messages: 77 self.addMessage(m)
78 79 from flumotion.component.producers import checks 80 d = checks.checkTicket349() 81 d.addCallback(on_result) 82 return d
83
84 - def get_pipeline_string(self, properties):
85 # setup the properties 86 self.bus = None 87 self.videowidth = properties.get('width', 240) 88 self.videoheight = properties.get('height', int(576 * self.videowidth/720.)) 89 self.videoframerate = properties.get('framerate', (25, 2)) 90 self.filelocation = properties.get('location') 91 92 vstruct = gst.structure_from_string("video/x-raw-yuv,width=%(width)d,height=%(height)d" % 93 dict (width=self.videowidth, height=self.videoheight)) 94 vstruct['framerate'] = gst.Fraction(self.videoframerate[0], 95 self.videoframerate[1]) 96 97 vcaps = gst.Caps(vstruct) 98 99 self.run_discoverer() 100 101 template = ( 102 'filesrc location=%(location)s' 103 ' ! oggdemux name=demux' 104 ' demux. ! queue ! theoradec name=theoradec' 105 ' ! identity name=videolive single-segment=true silent=true' 106 ' ! videorate name=videorate' 107 ' ! videoscale' 108 ' ! %(vcaps)s' 109 ' ! identity name=vident sync=true silent=true ! @feeder:video@' 110 ' demux. ! queue ! vorbisdec name=vorbisdec' 111 ' ! identity name=audiolive single-segment=true silent=true' 112 ' ! audioconvert' 113 ' ! audio/x-raw-int,width=16,depth=16,signed=(boolean)true' 114 ' ! identity name=aident sync=true silent=true ! @feeder:audio@' 115 % dict(location=self.filelocation, vcaps=vcaps)) 116 117 return template
118
119 - def make_message_for_gstreamer_error(self, gerror, debug):
120 if gerror.domain == 'gst-resource-error-quark': 121 return messages.Error(T_(N_("Could not open file %r."), 122 self.filelocation), 123 debug='%s\n%s' % (gerror.message, debug), 124 id=gerror.domain, priority=40) 125 base = feedcomponent.ParseLaunchComponent 126 return base.make_message_for_gstreamer_error(gerror, debug)
127
128 - def run_discoverer(self):
129 def discovered(d, ismedia): 130 self.uiState.set('info-location', self.filelocation) 131 self.uiState.set('info-duration', 132 max(d.audiolength, d.videolength)) 133 if d.is_audio: 134 self.uiState.set('info-audio', 135 "%d channel(s) %dHz" % (d.audiochannels, 136 d.audiorate)) 137 if d.is_video: 138 self.uiState.set('info-video', 139 "%d x %d at %d/%d fps" % (d.videowidth, 140 d.videoheight, 141 d.videorate.num, 142 d.videorate.denom))
143 144 from gst.extend import discoverer 145 d = discoverer.Discoverer(self.filelocation) 146 d.connect('discovered', discovered) 147 d.discover() 148
149 - def on_segment_done(self):
150 self.do_seek(False) 151 self.nbiterations += 1 152 self.uiState.set('num-iterations', self.nbiterations)
153
154 - def on_pads_blocked(self):
155 for src, sink in self.pads_to_link: 156 src.link(sink) 157 self.do_seek(True) 158 for src, sink in self.pads_to_link: 159 src.set_blocked_async(False, lambda *x: None) 160 self.pads_to_link = [] 161 self.nbiterations = 0 162 self.uiState.set('num-iterations', self.nbiterations)
163
164 - def configure_pipeline(self, pipeline, properties):
165 def on_message(bus, message): 166 handlers = {(pipeline, gst.MESSAGE_SEGMENT_DONE): 167 self.on_segment_done, 168 (pipeline, gst.MESSAGE_APPLICATION): 169 self.on_pads_blocked} 170 171 if (message.src, message.type) in handlers: 172 handlers[(message.src, message.type)]()
173 174 self.oggdemux = pipeline.get_by_name("demux") 175 176 for name in 'aident', 'vident': 177 def blocked(x, is_blocked): 178 if not x in self.pads_awaiting_block: 179 return 180 self.pads_awaiting_block.remove(x) 181 if not self.pads_awaiting_block: 182 s = gst.Structure('pads-blocked') 183 m = gst.message_new_application(pipeline, s) 184 # marshal to the main thread 185 pipeline.post_message(m) 186 187 e = pipeline.get_by_name(name) 188 src = e.get_pad('src') 189 sink = src.get_peer() 190 src.unlink(sink) 191 src.set_blocked_async(True, blocked) 192 self.pads_awaiting_block.append(src) 193 self.pads_to_link.append((src, sink)) 194 195 self.bus = pipeline.get_bus() 196 self.bus.add_signal_watch() 197 198 self.bus.connect('message', on_message) 199
200 - def do_seek(self, flushing):
201 """ 202 Restarts the looping. 203 204 Returns True if the seeking was accepted, 205 Returns False otherwiser 206 """ 207 self.debug("restarting looping") 208 flags = gst.SEEK_FLAG_SEGMENT | (flushing and gst.SEEK_FLAG_FLUSH or 0) 209 return self.oggdemux.seek(1.0, gst.FORMAT_TIME, flags, 210 gst.SEEK_TYPE_SET, 0, gst.SEEK_TYPE_END, 0)
211
212 - def do_setup(self):
213 def check_time(): 214 self.log("checking position") 215 try: 216 pos, format = self.pipeline.query_position(gst.FORMAT_TIME) 217 except: 218 self.debug("position query didn't succeed") 219 else: 220 self.uiState.set('position', pos) 221 return True
222 223 if not self.timeoutid: 224 self.timeoutid = gobject.timeout_add(500, check_time) 225
226 - def do_stop(self):
227 if self.bus: 228 self.bus.remove_signal_watch() 229 self.bus = None 230 231 if self.timeoutid: 232 gobject.source_remove(self.timeoutid) 233 self.timeoutid = 0 234 235 self.nbiterations = 0
236