Package flumotion :: Package component :: Package producers :: Package playlist :: Module playlist
[hide private]

Source Code for Module flumotion.component.producers.playlist.playlist

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import gst 
 23  import gobject 
 24  import time 
 25   
 26  from twisted.internet import defer, reactor 
 27   
 28  from flumotion.common import errors, messages, log, fxml 
 29  from flumotion.component import feedcomponent 
 30  from flumotion.component.base import watcher 
 31   
 32  from flumotion.common.messages import N_ 
 33   
 34  import smartscale 
 35  import singledecodebin 
 36  import playlistparser 
 37   
 38  T_ = messages.gettexter('flumotion') 
 39   
40 -def _tsToString(ts):
41 """ 42 Return a string in local time from a gstreamer timestamp value 43 """ 44 return time.ctime(ts/gst.SECOND)
45
46 -def videotest_gnl_src(name, start, duration, priority, pattern=None):
47 src = gst.element_factory_make('videotestsrc') 48 if pattern: 49 src.props.pattern = pattern 50 else: 51 # Set videotestsrc to all black. 52 src.props.pattern = 2 53 gnlsrc = gst.element_factory_make('gnlsource', name) 54 gnlsrc.props.start = start 55 gnlsrc.props.duration = duration 56 gnlsrc.props.media_start = 0 57 gnlsrc.props.media_duration = duration 58 gnlsrc.props.priority = priority 59 gnlsrc.add(src) 60 61 return gnlsrc
62
63 -def audiotest_gnl_src(name, start, duration, priority, wave=None):
64 src = gst.element_factory_make('audiotestsrc') 65 if wave: 66 src.props.wave = wave 67 else: 68 # Set audiotestsrc to use silence. 69 src.props.wave = 4 70 gnlsrc = gst.element_factory_make('gnlsource', name) 71 gnlsrc.props.start = start 72 gnlsrc.props.duration = duration 73 gnlsrc.props.media_start = 0 74 gnlsrc.props.media_duration = duration 75 gnlsrc.props.priority = priority 76 gnlsrc.add(src) 77 78 return gnlsrc
79
80 -def file_gnl_src(name, uri, caps, start, duration, offset, priority):
81 src = singledecodebin.SingleDecodeBin(caps, uri) 82 gnlsrc = gst.element_factory_make('gnlsource', name) 83 gnlsrc.props.start = start 84 gnlsrc.props.duration = duration 85 gnlsrc.props.media_start = offset 86 gnlsrc.props.media_duration = duration 87 gnlsrc.props.priority = priority 88 gnlsrc.add(src) 89 90 return gnlsrc
91
92 -class PlaylistProducerMedium(feedcomponent.FeedComponentMedium):
93 - def __init__(self, comp):
95
96 - def remote_add_playlist(self, data):
97 self.comp.addPlaylist(data)
98
99 -class PlaylistProducer(feedcomponent.FeedComponent):
100 logCategory = 'playlist-prod' 101 componentMediumClass = PlaylistProducerMedium 102
103 - def init(self):
104 self.basetime = -1 105 106 self._hasAudio = True 107 self._hasVideo = True 108 109 # The gnlcompositions for audio and video 110 self.videocomp = None 111 self.audiocomp = None 112 113 self.videocaps = gst.Caps("video/x-raw-yuv;video/x-raw-rgb") 114 self.audiocaps = gst.Caps("audio/x-raw-int;audio/x-raw-float") 115 116 self._vsrcs = {} # { PlaylistItem -> gnlsource } 117 self._asrcs = {} # { PlaylistItem -> gnlsource }
118
119 - def _buildAudioPipeline(self, pipeline, src):
120 audiorate = gst.element_factory_make("audiorate") 121 audioconvert = gst.element_factory_make('audioconvert') 122 audioresample = gst.element_factory_make('audioresample') 123 outcaps = gst.Caps( 124 "audio/x-raw-int,channels=%d,rate=%d,width=16,depth=16" % 125 (self._channels, self._samplerate)) 126 127 capsfilter = gst.element_factory_make("capsfilter") 128 capsfilter.props.caps = outcaps 129 130 pipeline.add(audiorate, audioconvert, audioresample, capsfilter) 131 src.link(audioconvert) 132 audioconvert.link(audioresample) 133 audioresample.link(audiorate) 134 audiorate.link(capsfilter) 135 136 return capsfilter.get_pad('src')
137
138 - def _buildVideoPipeline(self, pipeline, src):
139 outcaps = gst.Caps( 140 "video/x-raw-yuv,width=%d,height=%d,framerate=%d/%d," 141 "pixel-aspect-ratio=1/1" % 142 (self._width, self._height, self._framerate[0], 143 self._framerate[1])) 144 145 cspace = gst.element_factory_make("ffmpegcolorspace") 146 scaler = smartscale.SmartVideoScale() 147 scaler.set_caps(outcaps) 148 videorate = gst.element_factory_make("videorate") 149 capsfilter = gst.element_factory_make("capsfilter") 150 capsfilter.props.caps = outcaps 151 152 pipeline.add(cspace, scaler, videorate, capsfilter) 153 154 src.link(cspace) 155 cspace.link(scaler) 156 scaler.link(videorate) 157 videorate.link(capsfilter) 158 return capsfilter.get_pad('src')
159
160 - def _buildPipeline(self):
161 pipeline = gst.Pipeline() 162 163 for mediatype in ['audio', 'video']: 164 if (mediatype == 'audio' and not self._hasAudio) or ( 165 mediatype == 'video' and not self._hasVideo): 166 continue 167 168 # For each of audio, video, we build a pipeline that looks roughly 169 # like: 170 # 171 # gnlcomposition ! identity single-segment=true ! 172 # audio/video-elements ! identity sync=true ! sink 173 174 composition = gst.element_factory_make("gnlcomposition", 175 mediatype + "-composition") 176 177 segmentidentity = gst.element_factory_make("identity") 178 segmentidentity.set_property("single-segment", True) 179 segmentidentity.set_property("silent", True) 180 syncidentity = gst.element_factory_make("identity") 181 syncidentity.set_property("silent", True) 182 syncidentity.set_property("sync", True) 183 184 pipeline.add(composition, segmentidentity, syncidentity) 185 186 def _padAddedCb(element, pad, target): 187 self.debug("Pad added, linking") 188 pad.link(target)
189 composition.connect('pad-added', _padAddedCb, 190 segmentidentity.get_pad("sink")) 191 192 if mediatype == 'audio': 193 self.audiocomp = composition 194 srcpad = self._buildAudioPipeline(pipeline, segmentidentity) 195 else: 196 self.videocomp = composition 197 srcpad = self._buildVideoPipeline(pipeline, segmentidentity) 198 199 srcpad.link(syncidentity.get_pad('sink')) 200 201 feedername = 'feeder:%s:%s' % (self.name, mediatype) 202 chunk = self.FEEDER_TMPL % {'name': feedername} 203 binstr = "bin.("+chunk+" )" 204 self.debug("Parse for media composition is %s", binstr) 205 206 bin = gst.parse_launch(binstr) 207 pad = bin.find_unconnected_pad(gst.PAD_SINK) 208 ghostpad = gst.GhostPad(mediatype + "-feederpad", pad) 209 bin.add_pad(ghostpad) 210 211 pipeline.add(bin) 212 syncidentity.get_pad('src').link(ghostpad) 213 214 return pipeline
215
216 - def _createDefaultSources(self, properties):
217 if self._hasVideo: 218 vsrc = videotest_gnl_src("videotestdefault", 0, 2**63 - 1, 219 2**31 - 1, properties.get('video-pattern', None)) 220 self.videocomp.add(vsrc) 221 222 if self._hasAudio: 223 asrc = audiotest_gnl_src("videotestdefault", 0, 2**63 - 1, 224 2**31 - 1, properties.get('audio-wave', None)) 225 self.audiocomp.add(asrc)
226
227 - def set_master_clock(self, ip, port, base_time):
228 raise NotImplementedError("Playlist producer doesn't support slaving")
229
230 - def provide_master_clock(self, port):
231 # Most of this copied from feedcomponent010, but changed in various 232 # ways. Refactor the base class? 233 if self.medium: 234 ip = self.medium.getIP() 235 else: 236 ip = "127.0.0.1" 237 238 clock = self.pipeline.get_clock() 239 self.clock_provider = gst.NetTimeProvider(clock, None, port) 240 # small window here but that's ok 241 self.clock_provider.set_property('active', False) 242 243 self._master_clock_info = (ip, port, self.basetime) 244 245 return defer.succeed(self._master_clock_info)
246
247 - def get_master_clock(self):
248 return self._master_clock_info
249
250 - def _setupClock(self, pipeline):
251 # Configure our pipeline to use a known basetime and clock. 252 clock = gst.SystemClock() 253 # It doesn't matter too much what this basetime is, so long as we know 254 # the value. 255 self.basetime = clock.get_time() 256 257 # We force usage of the system clock. 258 pipeline.use_clock(clock) 259 # Now we disable default basetime distribution 260 pipeline.set_new_stream_time(gst.CLOCK_TIME_NONE) 261 # And we choose our own basetime... 262 self.debug("Setting basetime of %d", self.basetime) 263 pipeline.set_base_time(self.basetime)
264
265 - def timeReport(self):
266 ts = self.pipeline.get_clock().get_time() 267 self.debug("Pipeline clock is now at %d -> %s", ts, _tsToString(ts)) 268 reactor.callLater(10, self.timeReport)
269
270 - def getCurrentPosition(self):
271 return self.pipeline.query_position(gst.FORMAT_TIME)[0]
272
273 - def scheduleItem(self, item):
274 """ 275 Schedule a given playlist item in our playback compositions. 276 """ 277 start = item.timestamp - self.basetime 278 self.debug("Starting item %s at %d seconds from start: %s", item.uri, 279 start/gst.SECOND, _tsToString(item.timestamp)) 280 281 # If we schedule things to start before the current pipeline position, 282 # gnonlin will adjust this to start now. However, it does this 283 # separately for audio and video, so we start from different points, 284 # thus we're out of sync. 285 # So, always start slightly in the future... 5 seconds seems to work 286 # fine in practice. 287 now = self.getCurrentPosition() 288 neareststarttime = now + 5 * gst.SECOND 289 290 if start < neareststarttime: 291 if start + item.duration < neareststarttime: 292 self.debug("Item too late; skipping entirely") 293 return False 294 else: 295 change = neareststarttime - start 296 self.debug("Starting item with offset %d", change) 297 item.duration -= change 298 item.offset += change 299 start = neareststarttime 300 301 end = start + item.duration 302 timeuntilend = end - now 303 # After the end time, remove this item from the composition, otherwise 304 # it will continue to use huge gobs of memory and lots of threads. 305 reactor.callLater(timeuntilend/gst.SECOND + 5, 306 self.unscheduleItem, item) 307 308 if self._hasVideo and item.hasVideo: 309 self.debug("Adding video source with start %d, duration %d, " 310 "offset %d", start, item.duration, item.offset) 311 vsrc = file_gnl_src(None, item.uri, self.videocaps, 312 start, item.duration, item.offset, 0) 313 self.videocomp.add(vsrc) 314 self._vsrcs[item] = vsrc 315 if self._hasAudio and item.hasAudio: 316 self.debug("Adding audio source with start %d, duration %d, " 317 "offset %d", start, item.duration, item.offset) 318 asrc = file_gnl_src(None, item.uri, self.audiocaps, 319 start, item.duration, item.offset, 0) 320 self.audiocomp.add(asrc) 321 self._asrcs[item] = asrc 322 self.debug("Done scheduling: start at %s, end at %s", 323 _tsToString(start + self.basetime), 324 _tsToString(start + self.basetime + item.duration)) 325 return True
326
327 - def unscheduleItem(self, item):
328 self.debug("Unscheduling item at uri %s", item.uri) 329 if self._hasVideo and item.hasVideo and item in self._vsrcs: 330 vsrc = self._vsrcs.pop(item) 331 self.videocomp.remove(vsrc) 332 vsrc.set_state(gst.STATE_NULL) 333 if self._hasAudio and item.hasAudio and item in self._asrcs: 334 asrc = self._asrcs.pop(item) 335 self.audiocomp.remove(asrc) 336 asrc.set_state(gst.STATE_NULL)
337
338 - def adjustItemScheduling(self, item):
339 if self._hasVideo and item.hasVideo: 340 vsrc = self._vsrcs[item] 341 vsrc.props.start = item.timestamp 342 vsrc.props.duration = item.duration 343 vsrc.props.media_duration = item.duration 344 if self._hasAudio and item.hasAudio: 345 asrc = self._asrcs[item] 346 asrc.props.start = item.timestamp 347 asrc.props.duration = item.duration 348 asrc.props.media_duration = item.duration
349
350 - def addPlaylist(self, data):
351 self.playlistparser.parseData(data)
352
353 - def create_pipeline(self):
354 props = self.config['properties']; 355 356 self._playlistfile = props.get('playlist', None) 357 self._playlistdirectory = props.get('playlist-directory', None) 358 self._baseDirectory = props.get('base-directory', None) 359 360 self._width = props.get('width', 320) 361 self._height = props.get('height', 240) 362 self._framerate = props.get('framerate', (15, 1)) 363 self._samplerate = props.get('samplerate', 44100) 364 self._channels = props.get('channels', 2) 365 366 self._hasAudio = props.get('audio', True) 367 self._hasVideo = props.get('video', True) 368 369 pipeline = self._buildPipeline() 370 self._setupClock(pipeline) 371 372 self._createDefaultSources(props) 373 374 return pipeline
375
376 - def _watchDirectory(self, dir):
377 self.debug("Watching directory %s", dir) 378 self._filesAdded = {} 379 380 self._directoryWatcher = watcher.DirectoryWatcher(dir) 381 self._directoryWatcher.subscribe(fileChanged=self._watchFileChanged, 382 fileDeleted=self._watchFileDeleted) 383 384 # in the start call watcher should find all the existing 385 # files, so we block discovery while the watcher starts 386 self.playlistparser.blockDiscovery() 387 try: 388 self._directoryWatcher.start() 389 finally: 390 self.playlistparser.unblockDiscovery()
391
392 - def _watchFileDeleted(self, file):
393 self.debug("File deleted: %s", file) 394 if file in self._filesAdded: 395 self.playlistparser.playlist.removeItems(file) 396 self._filesAdded.pop(file) 397 398 self._cleanMessage(file)
399
400 - def _cleanMessage(self, file):
401 # There's no message removal API! We have to do this instead. Ick? 402 msgid = ("playlist-parse-error", file) 403 for m in self.state.get('messages'): 404 if m.id == msgid: 405 self.state.remove('messages', m)
406
407 - def _watchFileChanged(self, file):
408 self.debug("File changed: %s", file) 409 if file in self._filesAdded: 410 self.debug("Removing existing items for changed playlist") 411 self.playlistparser.playlist.removeItems(file) 412 413 self._filesAdded[file] = None 414 self._cleanMessage(file) 415 try: 416 self.debug("Parsing file: %s", file) 417 self.playlistparser.parseFile(file, id=file) 418 except fxml.ParserError, e: 419 self.warning("Failed to parse playlist file: %r", e) 420 # Since this isn't done directly via the remote method, add a 421 # message so people can find out that it failed... 422 # Use a tuple including the filename to identify the warning, so we 423 # can add/remove one per file 424 msgid = ("playlist-parse-error", file) 425 self.addMessage( 426 messages.Warning(T_(N_( 427 "Failed to parse a playlist from file %s: %s" % 428 (file, e))), id=msgid))
429
430 - def do_setup(self):
431 playlist = playlistparser.Playlist(self) 432 self.playlistparser = playlistparser.PlaylistXMLParser(playlist) 433 if self._baseDirectory: 434 self.playlistparser.setBaseDirectory(self._baseDirectory) 435 436 if self._playlistfile: 437 try: 438 self.playlistparser.parseFile(self._playlistfile) 439 except fxml.ParserError, e: 440 self.warning("Failed to parse playlist file: %r", e) 441 442 if self._playlistdirectory: 443 self._watchDirectory(self._playlistdirectory) 444 445 reactor.callLater(10, self.timeReport)
446