Package flumotion :: Package component :: Package consumers :: Package disker :: Module disker
[hide private]

Source Code for Module flumotion.component.consumers.disker.disker

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import errno 
 23  import os 
 24  import time 
 25  from datetime import datetime 
 26   
 27  import gobject 
 28  import gst 
 29  import time 
 30   
 31  from twisted.internet import reactor 
 32   
 33  from flumotion.component import feedcomponent 
 34  from flumotion.common import log, gstreamer, pygobject, messages, errors 
 35  from flumotion.common import common 
 36   
 37  # proxy import 
 38  from flumotion.component.component import moods 
 39  from flumotion.common.pygobject import gsignal 
 40   
 41  from flumotion.common.messages import N_ 
 42  T_ = messages.gettexter('flumotion') 
 43   
 44  __all__ = ['Disker'] 
 45   
 46   
 47  """ 
 48  Disker has a property 'ical-schedule'. This allows an ical file to be 
 49  specified in the config and have recordings scheduled based on events. 
 50  This file will be monitored for changes and events reloaded if this 
 51  happens. 
 52   
 53  The filename of a recording started from an ical file will be produced 
 54  via passing the ical event summary through strftime, so that an archive 
 55  can encode the date and time that it was begun. 
 56   
 57  The time that will be given to strftime will be given in the timezone of 
 58  the ical event. In practice this will either be UTC or the local time of 
 59  the machine running the disker, as the ical scheduler does not 
 60  understand arbitrary timezones. 
 61  """ 
 62   
 63  try: 
 64      # icalendar and dateutil modules needed for scheduling recordings 
 65      from icalendar import Calendar 
 66      from dateutil import rrule 
 67      HAS_ICAL = True 
 68  except: 
 69      HAS_ICAL = False 
 70   
71 -class DiskerMedium(feedcomponent.FeedComponentMedium):
72 # called when admin ui wants to stop recording. call changeFilename to 73 # restart
74 - def remote_stopRecording(self):
75 self.comp.stop_recording()
76 77 # called when admin ui wants to change filename (this starts recording if 78 # the disker isn't currently writing to disk)
79 - def remote_changeFilename(self, filenameTemplate=None):
80 self.comp.change_filename(filenameTemplate)
81
82 - def remote_scheduleRecordings(self, ical):
83 if HAS_ICAL: 84 cal = Calendar.from_string(ical) 85 self.addEvents(self.comp.icalScheduler.parseCalendar(cal))
86 87 # called when admin ui wants updated state (current filename info)
88 - def remote_notifyState(self):
89 self.comp.update_ui_state()
90
91 -class Disker(feedcomponent.ParseLaunchComponent, log.Loggable):
92 componentMediumClass = DiskerMedium 93 checkOffset = True 94 pipe_template = 'multifdsink sync-method=1 name=fdsink mode=1 sync=false' 95 file = None 96 directory = None 97 location = None 98 caps = None 99
100 - def init(self):
101 self.uiState.addKey('filename', None) 102 self.uiState.addKey('recording', False) 103 self.uiState.addKey('can-schedule', HAS_ICAL)
104
105 - def get_pipeline_string(self, properties):
106 directory = properties['directory'] 107 108 self.directory = directory 109 110 self.fixRenamedProperties(properties, [('rotateType', 'rotate-type')]) 111 112 rotateType = properties.get('rotate-type', 'none') 113 114 # validate rotate-type and size/time properties first 115 if not rotateType in ['none', 'size', 'time']: 116 m = messages.Error(T_(N_( 117 "The configuration property 'rotate-type' should be set to " 118 "'size', time', or 'none', not '%s'. " 119 "Please fix the configuration."), 120 rotateType), id='rotate-type') 121 self.addMessage(m) 122 raise errors.ComponentSetupHandledError() 123 124 # size and time types need the property specified 125 if rotateType in ['size', 'time']: 126 if rotateType not in properties.keys(): 127 m = messages.Error(T_(N_( 128 "The configuration property '%s' should be set. " 129 "Please fix the configuration."), 130 rotateType), id='rotate-type') 131 self.addMessage(m) 132 raise errors.ComponentSetupHandledError() 133 134 # now act on the properties 135 if rotateType == 'size': 136 self.setSizeRotate(properties['size']) 137 elif rotateType == 'time': 138 self.setTimeRotate(properties['time']) 139 # FIXME: should add a way of saying "do first cycle at this time" 140 141 return self.pipe_template
142
143 - def setTimeRotate(self, time):
144 """ 145 @param time: duration of file (in seconds) 146 """ 147 reactor.callLater(time, self._rotateTimeCallback, time)
148
149 - def setSizeRotate(self, size):
150 """ 151 @param size: size of file (in bytes) 152 """ 153 reactor.callLater(5, self._rotateSizeCallback, size)
154
155 - def _rotateTimeCallback(self, time):
156 self.change_filename() 157 158 # Add a new one 159 reactor.callLater(time, self._rotateTimeCallback, time)
160
161 - def _rotateSizeCallback(self, size):
162 if not self.location: 163 self.warning('Cannot rotate file, no file location set') 164 else: 165 if os.stat(self.location).st_size > size: 166 self.change_filename() 167 168 # Add a new one 169 reactor.callLater(5, self._rotateTimeCallback, size)
170
171 - def get_mime(self):
172 if self.caps: 173 return self.caps.get_structure(0).get_name()
174
175 - def get_content_type(self):
176 mime = self.get_mime() 177 if mime == 'multipart/x-mixed-replace': 178 mime += ";boundary=ThisRandomString" 179 return mime
180
181 - def change_filename(self, filenameTemplate=None, timeOrTuple=None):
182 """ 183 @param filenameTemplate: strftime formatted string to decide filename 184 @param timeOrTuple: a valid time to pass to strftime, defaulting 185 to time.localtime(). A 9-tuple may be passed instead. 186 """ 187 mime = self.get_mime() 188 if mime == 'application/ogg': 189 ext = 'ogg' 190 elif mime == 'multipart/x-mixed-replace': 191 ext = 'multipart' 192 elif mime == 'audio/mpeg': 193 ext = 'mp3' 194 elif mime == 'video/x-msvideo': 195 ext = 'avi' 196 elif mime == 'video/x-ms-asf': 197 ext = 'asf' 198 elif mime == 'audio/x-flac': 199 ext = 'flac' 200 elif mime == 'audio/x-wav': 201 ext = 'wav' 202 elif mime == 'video/x-matroska': 203 ext = 'mkv' 204 elif mime == 'video/x-dv': 205 ext = 'dv' 206 elif mime == 'video/x-flv': 207 ext = 'flv' 208 elif mime == 'video/mpegts': 209 ext = 'ts' 210 else: 211 ext = 'data' 212 213 self.stop_recording() 214 215 sink = self.get_element('fdsink') 216 if sink.get_state() == gst.STATE_NULL: 217 sink.set_state(gst.STATE_READY) 218 219 filename = "" 220 if not filenameTemplate: 221 filenameTemplate = self._defaultFilenameTemplate 222 filename = "%s.%s" % (common.strftime(filenameTemplate, 223 timeOrTuple or time.localtime()), ext) 224 self.location = os.path.join(self.directory, filename) 225 self.info("Changing filename to %s", self.location) 226 try: 227 self.file = open(self.location, 'a') 228 except IOError, e: 229 self.warning("Failed to open output file %s: %s", 230 self.location, log.getExceptionMessage(e)) 231 m = messages.Error(T_(N_("Failed to open output file " 232 "%s. Check your permissions." 233 % (self.location,)))) 234 self.addMessage(m) 235 return 236 self._plug_recording_started(self.file, self.location) 237 sink.emit('add', self.file.fileno()) 238 self.uiState.set('filename', self.location) 239 self.uiState.set('recording', True) 240 241 if self.symlink_to_current_recording: 242 self.update_symlink(self.location, 243 self.symlink_to_current_recording)
244 266
267 - def stop_recording(self):
268 sink = self.get_element('fdsink') 269 if sink.get_state() == gst.STATE_NULL: 270 sink.set_state(gst.STATE_READY) 271 272 if self.file: 273 self.file.flush() 274 sink.emit('remove', self.file.fileno()) 275 self._plug_recording_stopped(self.file, self.location) 276 self.file = None 277 self.uiState.set('filename', None) 278 self.uiState.set('recording', False) 279 if self.symlink_to_last_recording: 280 self.update_symlink(self.location, 281 self.symlink_to_last_recording)
282
283 - def _notify_caps_cb(self, pad, param):
284 caps = pad.get_negotiated_caps() 285 if caps == None: 286 return 287 288 caps_str = gstreamer.caps_repr(caps) 289 self.debug('Got caps: %s' % caps_str) 290 291 new = True 292 if not self.caps == None: 293 self.warning('Already had caps: %s, replacing' % caps_str) 294 new = False 295 296 self.debug('Storing caps: %s' % caps_str) 297 self.caps = caps 298 299 if new and self._recordAtStart: 300 reactor.callLater(0, self.change_filename, 301 self._startFilenameTemplate)
302 303 # callback for when a client is removed so we can figure out 304 # errors
305 - def _client_removed_cb(self, element, arg0, client_status):
306 # check if status is error 307 if client_status == 4: 308 reactor.callFromThread(self._client_error_cb)
309
310 - def _client_error_cb(self):
311 self.file.close() 312 self.file = None 313 # make element sad 314 self.setMood(moods.sad) 315 id = "error-writing-%s" % self.location 316 m = messages.Error(T_(N_( 317 "Error writing to file %s. Maybe disk is full." % ( 318 self.location))), 319 id=id, priority=40) 320 self.addMessage(m)
321
322 - def configure_pipeline(self, pipeline, properties):
323 self.debug('configure_pipeline for disker') 324 self.symlink_to_last_recording = \ 325 properties.get('symlink-to-last-recording', None) 326 self.symlink_to_current_recording = \ 327 properties.get('symlink-to-current-recording', None) 328 self._recordAtStart = properties.get('start-recording', True) 329 self._defaultFilenameTemplate = properties.get('filename', 330 '%s.%%Y%%m%%d-%%H%%M%%S' % self.getName()) 331 self._startFilenameTemplate = self._defaultFilenameTemplate 332 icalfn = properties.get('ical-schedule') 333 if HAS_ICAL and icalfn: 334 from flumotion.component.base import scheduler 335 try: 336 self.icalScheduler = scheduler.ICalScheduler(open( 337 icalfn, 'r')) 338 self.icalScheduler.subscribe(self.eventStarted, 339 self.eventStopped) 340 currentEvents = self.icalScheduler.getCurrentEvents() 341 if currentEvents: 342 self._startFilenameTemplate = currentEvents[0] 343 self._recordAtStart = True 344 else: 345 self._recordAtStart = False 346 except ValueError: 347 m = messages.Warning(T_(N_( 348 "Error parsing ical file %s, so not scheduling any" 349 " events." % icalfn)), id="error-parsing-ical") 350 self.addMessage(m) 351 352 elif icalfn: 353 warnStr = "An ical file has been specified for " \ 354 "scheduling but the necessary modules " \ 355 "dateutil and/or icalendar are not installed" 356 self.warning(warnStr) 357 m = messages.Warning(T_(N_(warnStr)), id="error-parsing-ical") 358 self.addMessage(m) 359 360 sink = self.get_element('fdsink') 361 sink.get_pad('sink').connect('notify::caps', self._notify_caps_cb) 362 # connect to client-removed so we can detect errors in file writing 363 sink.connect('client-removed', self._client_removed_cb) 364 365 # set event probe if we should react to video mark events 366 react_to_marks = properties.get('react-to-stream-markers', False) 367 if react_to_marks: 368 pfx = properties.get('stream-marker-filename-prefix', '%03d.') 369 self._marker_prefix = pfx 370 sink.get_pad('sink').add_event_probe(self._markers_event_probe)
371
372 - def eventStarted(self, event):
373 self.change_filename(event.content, event.start.timetuple())
374
375 - def eventStopped(self, event):
376 self.stop_recording()
377
378 - def parse_ical(self, icsStr):
379 if HAS_ICAL: 380 cal = Calendar.from_string(icsStr) 381 if self.icalScheduler: 382 events = self.icalScheduler.parseCalendar(cal) 383 if events: 384 self.icalScheduler.addEvents(events) 385 else: 386 self.warning("No events found in the ical string") 387 else: 388 self.warning("Cannot parse ICAL; neccesary modules not installed")
389
390 - def _plug_recording_started(self, file, location):
391 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug' 392 # make sure plugs are configured with our socket, see #732 393 if socket not in self.plugs: 394 return 395 for plug in self.plugs[socket]: 396 self.debug('invoking recording_started on ' 397 'plug %r on socket %s', plug, socket) 398 plug.recording_started(file, location)
399
400 - def _plug_recording_stopped(self, file, location):
401 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug' 402 # make sure plugs are configured with our socket, see #732 403 if socket not in self.plugs: 404 return 405 for plug in self.plugs[socket]: 406 self.debug('invoking recording_stopped on ' 407 'plug %r on socket %s', plug, socket) 408 plug.recording_stopped(file, location)
409
410 - def _markers_event_probe(self, element, event):
411 if event.type == gst.EVENT_CUSTOM_DOWNSTREAM: 412 evt_struct = event.get_structure() 413 if evt_struct.get_name() == 'FluStreamMark': 414 if evt_struct['action'] == 'start': 415 self._on_marker_start(evt_struct['prog_id']) 416 elif evt_struct['action'] == 'stop': 417 self._on_marker_stop() 418 return True
419
420 - def _on_marker_stop(self):
421 self.stop_recording()
422
423 - def _on_marker_start(self, data):
424 tmpl = self._defaultFilenameTemplate 425 if self._marker_prefix: 426 try: 427 tmpl = '%s%s' % (self._marker_prefix % data, 428 self._defaultFilenameTemplate) 429 except TypeError, err: 430 m = messages.Warning(T_(N_('Failed expanding filename prefix: ' 431 '%r <-- %r.'), 432 self._marker_prefix, data), 433 id='expand-marker-prefix') 434 self.addMessage(m) 435 self.warning('Failed expanding filename prefix: ' 436 '%r <-- %r; %r' % 437 (self._marker_prefix, data, err)) 438 self.change_filename(tmpl)
439