Package flumotion :: Package component :: Package producers :: Package playlist :: Module playlistparser
[hide private]

Source Code for Module flumotion.component.producers.playlist.playlistparser

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import gst 
 23  from gst.extend import discoverer 
 24   
 25  import time 
 26  import calendar 
 27  from StringIO import StringIO 
 28   
 29  from xml.dom import Node 
 30   
 31  from twisted.internet import reactor 
 32   
 33  from flumotion.common import log, fxml 
 34   
35 -class PlaylistItem(object, log.Loggable):
36 - def __init__(self, id, timestamp, uri, offset, duration):
37 self.id = id 38 self.timestamp = timestamp 39 self.uri = uri 40 self.offset = offset 41 self.duration = duration 42 43 self.hasAudio = True 44 self.hasVideo = True 45 46 self.next = None 47 self.prev = None
48
49 -class Playlist(object, log.Loggable):
50 logCategory = 'playlist-list' 51
52 - def __init__(self, producer):
53 """ 54 Create an initially empty playlist 55 """ 56 self.items = None # PlaylistItem linked list 57 self._itemsById = {} 58 59 self.producer = producer
60
61 - def _findItem(self, position):
62 # Get the item that corresponds to position, or None 63 cur = self.items 64 while cur: 65 if cur.timestamp < position and \ 66 cur.timestamp + cur.duration > position: 67 return cur 68 if cur.timestamp > position: 69 return None # fail without having to iterate over everything 70 cur = cur.next 71 return None
72
73 - def _getCurrentItem(self):
74 position = self.producer.getCurrentPosition() 75 item = self._findItem(position) 76 self.debug("Item %r found as current for playback position %d", 77 item, position) 78 return item
79
80 - def removeItems(self, id):
81 current = self._getCurrentItem() 82 83 if id not in self._itemsById: 84 return 85 86 items = self._itemsById[id] 87 for item in items: 88 if (current and item.timestamp < current.timestamp + 89 current.duration): 90 self.debug("Not removing current item!") 91 continue 92 self.unlinkItem(item) 93 self.producer.unscheduleItem(item) 94 95 del self._itemsById[id]
96
97 - def addItem(self, id, timestamp, uri, offset, duration, hasAudio, hasVideo):
98 """ 99 Add an item to the playlist. 100 101 This may remove overlapping entries, or adjust timestamps/durations of 102 entries to make the new one fit. 103 """ 104 current = self._getCurrentItem() 105 if current and timestamp < current.timestamp + current.duration: 106 self.warning("New object at uri %s starts during current object, " 107 "cannot add") 108 return None 109 # We don't care about anything older than now; drop references to them 110 if current: 111 self.items = current 112 113 newitem = PlaylistItem(id, timestamp, uri, offset, duration) 114 newitem.hasAudio = hasAudio 115 newitem.hasVideo = hasVideo 116 117 if id in self._itemsById: 118 self._itemsById[id].append(newitem) 119 else: 120 self._itemsById[id] = [newitem] 121 122 # prev starts strictly before the new item 123 # next starts after the new item, and ends after the end of the new item 124 prev = next = None 125 item = self.items 126 while item: 127 if item.timestamp < newitem.timestamp: 128 prev = item 129 else: 130 break 131 item = item.next 132 133 if prev: 134 item = prev.next 135 while item: 136 if (item.timestamp > newitem.timestamp and 137 item.timestamp + item.duration > 138 newitem.timestamp + newitem.duration): 139 next = item 140 break 141 item = item.next 142 143 if prev: 144 # Then things between prev and next (next might be None) are to be 145 # deleted. Do so. 146 cur = prev.next 147 while cur != next: 148 self._itemsById[cur.id].remove(cur) 149 if not self._itemsById[cur.id]: 150 del self._itemsById[cur.id] 151 self.producer.unscheduleItem(cur) 152 cur = cur.next 153 154 # update links. 155 if prev: 156 prev.next = newitem 157 newitem.prev = prev 158 else: 159 self.items = newitem 160 161 if next: 162 newitem.next = next 163 next.prev = newitem 164 165 # Duration adjustments -> Reflect into gnonlin timeline 166 if prev and prev.timestamp + prev.duration > newitem.timestamp: 167 self.debug("Changing duration of previous item from %d to %d", 168 prev.duration, newitem.timestamp - prev.timestamp) 169 prev.duration = newitem.timestamp - prev.timestamp 170 self.producer.adjustItemScheduling(prev) 171 172 if next and newitem.timestamp + newitem.duration > next.timestamp: 173 self.debug("Changing timestamp of next item from %d to %d to fit", 174 newitem.timestamp, newitem.timestamp + newitem.duration) 175 ts = newitem.timestamp + newitem.duration 176 duration = next.duration - (ts - next.timestamp) 177 next.duration = duration 178 next.timestamp = ts 179 self.producer.adjustItemScheduling(next) 180 181 # Then we need to actually add newitem into the gnonlin timeline 182 if not self.producer.scheduleItem(newitem): 183 self.debug("Failed to schedule item, unlinking") 184 # Failed to schedule it. 185 self.unlinkItem(newitem) 186 return None 187 188 return newitem
189
190 - def unlinkItem(self, item):
191 if item.prev: 192 item.prev.next = item.next 193 else: 194 self.items = item.next 195 196 if item.next: 197 item.next.prev = item.prev
198
199 -class PlaylistParser(object, log.Loggable):
200 logCategory = 'playlist-parse' 201
202 - def __init__(self, playlist):
203 self.playlist = playlist 204 205 self._pending_items = [] 206 self._discovering = False 207 self._discovering_blocked = 0 208 209 self._baseDirectory = None
210
211 - def setBaseDirectory(self, baseDir):
212 if not baseDir.endswith('/'): 213 baseDir = baseDir + '/' 214 self._baseDirectory = baseDir
215
216 - def blockDiscovery(self):
217 """ 218 Prevent playlist parser from running discoverer on any pending 219 playlist entries. Multiple subsequent invocations will require 220 the same corresponding number of calls to L{unblockDiscovery} 221 to resume discovery. 222 """ 223 self._discovering_blocked += 1 224 self.debug(' blocking discovery: %d' % self._discovering_blocked)
225
226 - def unblockDiscovery(self):
227 """ 228 Resume discovering of any pending playlist entries. If 229 L{blockDiscovery} was called multiple times multiple 230 invocations of unblockDiscovery will be required to unblock 231 the discoverer. 232 """ 233 if self._discovering_blocked > 0: 234 self._discovering_blocked -= 1 235 self.debug('unblocking discovery: %d' % self._discovering_blocked) 236 if self._discovering_blocked < 1: 237 self.startDiscovery()
238
239 - def startDiscovery(self, doSort=True):
240 """ 241 Initiate discovery of any pending playlist entries. 242 243 @param doSort: should the pending entries be ordered 244 chronologically before initiating discovery 245 @type doSort: bool 246 """ 247 self.log('startDiscovery: discovering: %s, block: %d, pending: %d' % 248 (self._discovering, self._discovering_blocked, 249 len(self._pending_items))) 250 if not self._discovering and self._discovering_blocked < 1 \ 251 and self._pending_items: 252 if doSort: 253 self._sortPending() 254 self._discoverPending()
255
256 - def _sortPending(self):
257 self.debug('sort pending: %d' % len(self._pending_items)) 258 if not self._pending_items: 259 return 260 sortlist = [(elt[1], elt) for elt in self._pending_items] 261 sortlist.sort() 262 self._pending_items = [elt for (ts, elt) in sortlist]
263
264 - def _discoverPending(self):
265 def _discovered(disc, is_media): 266 self.debug("Discovered!") 267 reactor.callFromThread(_discoverer_done, disc, is_media)
268 269 def _discoverer_done(disc, is_media): 270 if is_media: 271 self.debug("Discovery complete, media found") 272 uri = "file://" + item[0] 273 timestamp = item[1] 274 duration = item[2] 275 offset = item[3] 276 id = item[4] 277 278 hasA = disc.is_audio 279 hasV = disc.is_video 280 durationDiscovered = min(disc.audiolength, 281 disc.videolength) 282 if not duration or duration > durationDiscovered: 283 duration = durationDiscovered 284 285 if duration + offset > durationDiscovered: 286 offset = 0 287 288 if duration > 0: 289 self.playlist.addItem(id, timestamp, uri, offset, duration, 290 hasA, hasV) 291 else: 292 self.warning("Duration of item is zero, not adding") 293 else: 294 self.warning("Discover failed to find media in %s", item[0]) 295 296 # We don't want to burn too much cpu discovering all the files; 297 # this throttles the discovery rate to a reasonable level 298 self.debug("Continuing on to next file in one second") 299 reactor.callLater(1, self._discoverPending)
300 301 if not self._pending_items: 302 self.debug("No more files to discover") 303 self._discovering = False 304 return 305 306 if self._discovering_blocked > 0: 307 self.debug("Discovering blocked: %d" % self._discovering_blocked) 308 self._discovering = False 309 return 310 311 self._discovering = True 312 313 item = self._pending_items.pop(0) 314 315 self.debug("Discovering file %s", item[0]) 316 disc = discoverer.Discoverer(item[0]) 317 318 disc.connect('discovered', _discovered) 319 disc.discover() 320
321 - def addItemToPlaylist(self, filename, timestamp, duration, offset, id):
322 # We only want to add it if it's plausibly schedulable. 323 end = timestamp 324 if duration is not None: 325 end += duration 326 if end < time.time() * gst.SECOND: 327 self.debug("Early-out: ignoring add for item in past") 328 return 329 330 if filename[0] != '/' and self._baseDirectory: 331 filename = self._baseDirectory + filename 332 333 self._pending_items.append((filename, timestamp, duration, offset, id)) 334 335 # Now launch the discoverer for any pending items 336 self.startDiscovery()
337
338 -class PlaylistXMLParser(PlaylistParser):
339 logCategory = 'playlist-xml' 340
341 - def parseData(self, data):
342 """ 343 Parse playlist XML document data 344 """ 345 file = StringIO(data) 346 self.parseFile(file)
347
348 - def replaceFile(self, file, id):
349 self.playlist.removeItems(id) 350 self.parseFile(file, id)
351
352 - def parseFile(self, file, id=None):
353 """ 354 Parse a playlist file. Adds the contents of the file to the existing 355 playlist, overwriting any existing entries for the same time period. 356 """ 357 parser = fxml.Parser() 358 359 root = parser.getRoot(file) 360 361 node = root.documentElement 362 self.debug("Parsing playlist from file %s", file) 363 if node.nodeName != 'playlist': 364 raise fxml.ParserError("Root node is not 'playlist'") 365 366 self.blockDiscovery() 367 try: 368 for child in node.childNodes: 369 if child.nodeType == Node.ELEMENT_NODE and \ 370 child.nodeName == 'entry': 371 self.debug("Parsing entry") 372 self._parsePlaylistEntry(parser, child, id) 373 finally: 374 self.unblockDiscovery()
375 376 # A simplified private version of this code from fxml without the 377 # undesirable unicode->str conversions.
378 - def _parseAttributes(self, node, required, optional):
379 out = [] 380 for k in required: 381 if node.hasAttribute(k): 382 out.append(node.getAttribute(k)) 383 else: 384 raise fxml.ParserError("Missing required attribute %s" % k) 385 386 for k in optional: 387 if node.hasAttribute(k): 388 out.append(node.getAttribute(k)) 389 else: 390 out.append(None) 391 return out
392
393 - def _parsePlaylistEntry(self, parser, entry, id):
394 mandatory = ['filename', 'time'] 395 optional = ['duration', 'offset'] 396 397 (filename, timestamp, duration, offset) = self._parseAttributes( 398 entry, mandatory, optional) 399 400 if duration is not None: 401 duration = int(float(duration) * gst.SECOND) 402 if offset is None: 403 offset = 0 404 offset = int(offset) * gst.SECOND 405 406 timestamp = self._parseTimestamp(timestamp) 407 408 # Assume UTF-8 filesystem. 409 filename = filename.encode("UTF-8") 410 411 self.addItemToPlaylist(filename, timestamp, duration, offset, id)
412
413 - def _parseTimestamp(self, ts):
414 # Take TS in YYYY-MM-DDThh:mm:ss.ssZ format, return timestamp in 415 # nanoseconds since the epoch 416 417 # time.strptime() doesn't handle the fractional seconds part. We ignore 418 # it entirely, after verifying that it has the right format. 419 tsmain, trailing = ts[:-4], ts[-4:] 420 if trailing[0] != '.' or trailing[3] != 'Z' or \ 421 not trailing[1].isdigit() or not trailing[2].isdigit(): 422 raise fxml.ParserError("Invalid timestamp %s" % ts) 423 format = "%Y-%m-%dT%H:%M:%S" 424 425 try: 426 timestruct = time.strptime(tsmain, format) 427 return int(calendar.timegm(timestruct) * gst.SECOND) 428 except ValueError: 429 raise fxml.ParserError("Invalid timestamp %s" % ts)
430