Package flumotion :: Package launch :: Module main
[hide private]

Source Code for Module flumotion.launch.main

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22   
 23  """ 
 24  Flumotion-launch: A gst-launch analog for Flumotion. 
 25   
 26  The goal of flumotion-launch is to provide an easy way for testing 
 27  flumotion components, without involving much of Flumotion's core code. 
 28   
 29  Flumotion-launch takes a terse gst-launch-like syntax, translates that 
 30  into a component graph, and starts the components. An example would be:: 
 31   
 32    flumotion-launch videotest ! theora-encoder ! ogg-muxer ! http-streamer 
 33   
 34  You can also set properties:: 
 35   
 36    flumotion-launch videotest framerate=15/2 
 37   
 38  You can link specific feeders as well:: 
 39   
 40    flumotion-launch firewire .audio ! vorbis-encoder 
 41    flumotion-launch firewire firewire0.audio ! vorbis-encoder 
 42   
 43  Components can be backreferenced using their names:: 
 44   
 45    flumotion-launch videotest audiotest videotest0. ! ogg-muxer \ 
 46                     audiotest0. ! ogg-muxer0. 
 47   
 48  In addition, components can have plugs:: 
 49   
 50    flumotion-launch http-streamer /apachelogger,logfile=/dev/stdout 
 51   
 52  Flumotion-launch explicitly avoids much of Flumotion's core logic. It 
 53  does not import flumotion.manager, flumotion.admin, or flumotion.worker. 
 54  There is no depgraph, no feed server, no job process. Although it might 
 55  be useful in the future to add a way to use the standard interfaces to 
 56  start components via admin, manager, worker, and job instances, this 
 57  low-level interface is useful in debugging problems and should be kept. 
 58  """ 
 59   
 60   
 61  import os 
 62  import sys 
 63   
 64  from twisted.python import reflect 
 65  from twisted.internet import reactor, defer 
 66   
 67  from flumotion.common import log, common, registry, errors, messages 
 68  from flumotion.twisted import flavors 
 69  from flumotion.common.options import OptionParser 
 70   
 71  from flumotion.launch import parse 
 72   
 73  from gettext import gettext as _ 
 74   
 75  _headings = { 
 76      messages.ERROR:   _('Error'), 
 77      messages.WARNING: _('Warning'), 
 78      messages.INFO:    _('Note') 
 79  } 
 80   
 81   
82 -def err(x):
83 sys.stderr.write(x + '\n') 84 raise SystemExit(1)
85 86
87 -class ComponentWrapper(object, log.Loggable):
88 logCategory = "compwrapper" 89
90 - def __init__(self, config):
91 self.name = config['name'] 92 self.config = config 93 self.procedure = self._getProcedure(config['type']) 94 self.component = None
95
96 - def _getProcedure(self, type):
97 r = registry.getRegistry() 98 c = r.getComponent(type) 99 try: 100 entry = c.getEntryByType('component') 101 except KeyError: 102 err('Component %s has no component entry' % self.name) 103 importname = entry.getModuleName(c.getBase()) 104 try: 105 module = reflect.namedAny(importname) 106 except Exception, e: 107 err('Could not load module %s for component %s: %s' 108 % (importname, self.name, e)) 109 return getattr(module, entry.getFunction())
110
111 - def instantiate(self):
112 errors = [] 113 def haveError(value): 114 translator = messages.Translator() 115 print "%s: %s" % (_headings[value.level], 116 translator.translate(value)) 117 if value.debug: 118 print "Debug information:", value.debug 119 errors.append(value)
120 121 self.component = self.procedure(self.config, 122 haveError=haveError) 123 return not bool(errors)
124
125 - def provideMasterClock(self, port):
126 # rtype: defer.Deferred 127 d = self.component.provide_master_clock(port) 128 return d
129
130 - def set_master_clock(self, ip, port, base_time):
131 return self.component.set_master_clock(ip, port, base_time)
132
133 - def stop(self):
134 return self.component.stop()
135
136 - def feedToFD(self, feedName, fd):
137 self.debug('feedToFD(feedName=%s, %d)' % (feedName, fd)) 138 return self.component.feedToFD(feedName, fd, os.close)
139
140 - def eatFromFD(self, eaterAlias, feedId, fd):
141 self.debug('eatFromFD(eaterAlias=%s, feedId=%s, %d)', 142 eaterAlias, feedId, fd) 143 return self.component.eatFromFD(eaterAlias, feedId, fd)
144
145 -def make_pipes(wrappers):
146 fds = {} # feedcompname:feeder => (fd, start()) 147 wrappersByName = dict([(wrapper.name, wrapper) 148 for wrapper in wrappers]) 149 def starter(wrapper, feedName, write): 150 return lambda: wrapper.feedToFD(feedName, write)
151 for wrapper in wrappers: 152 eaters = wrapper.config.get('eater', {}) 153 for eaterName in eaters: 154 for feedId, eaterAlias in eaters[eaterName]: 155 compName, feederName = common.parseFeedId(feedId) 156 read, write = os.pipe() 157 log.debug('launch', '%s: read from fd %d, write to fd %d', 158 feedId, read, write) 159 start = starter(wrappersByName[compName], feederName, write) 160 fds[feedId] = (read, start) 161 return fds 162
163 -def start_components(wrappers, fds):
164 # figure out the links and start the components 165 166 def provide_clock(): 167 # second phase: clocking 168 need_sync = [x for x in wrappers if x.config['clock-master']] 169 170 if need_sync: 171 master = None 172 for x in need_sync: 173 if x.config['clock-master'] == x.config['avatarId']: 174 master = x 175 break 176 assert master 177 need_sync.remove(master) 178 d = master.provideMasterClock(7600 - 1) # hack! 179 def addNeedSync(clocking): 180 return need_sync, clocking
181 d.addCallback(addNeedSync) 182 return d 183 else: 184 return defer.succeed((None, None)) 185 186 def do_start(synchronization, wrapper): 187 need_sync, clocking = synchronization 188 189 # start it up, with clocking data only if it needs it 190 eaters = wrapper.config.get('eater', {}) 191 for eaterName in eaters: 192 for feedId, eaterAlias in eaters[eaterName]: 193 read, start = fds[feedId] 194 wrapper.eatFromFD(eaterAlias, feedId, read) 195 start() 196 if (not need_sync) or (wrapper not in need_sync) or (not clocking): 197 clocking = None 198 if clocking: 199 wrapper.set_master_clock(*clocking) 200 return synchronization 201 202 def do_stop(failure): 203 for wrapper in wrappers: 204 wrapper.stop() 205 return failure 206 207 for wrapper in wrappers: 208 if not wrapper.instantiate(): 209 return defer.fail(errors.ComponentStartError) 210 d = provide_clock() 211 for wrapper in wrappers: 212 d.addCallback(do_start, wrapper) 213 d.addErrback(do_stop) 214 return d 215
216 -def main(args):
217 from flumotion.common import setup 218 setup.setupPackagePath() 219 from flumotion.configure import configure 220 log.debug('launch', 'Running Flumotion version %s' % 221 configure.version) 222 import twisted.copyright 223 log.debug('launch', 'Running against Twisted version %s' % 224 twisted.copyright.version) 225 from flumotion.project import project 226 for p in project.list(): 227 log.debug('launch', 'Registered project %s version %s' % ( 228 p, project.get(p, 'version'))) 229 230 parser = OptionParser(domain="flumotion-launch") 231 232 log.debug('launch', 'Parsing arguments (%r)' % ', '.join(args)) 233 options, args = parser.parse_args(args) 234 235 # verbose overrides --debug 236 if options.verbose: 237 log.setFluDebug("*:3") 238 239 # handle all options 240 if options.version: 241 print common.version("flumotion-launch") 242 return 0 243 244 if options.debug: 245 log.setFluDebug(options.debug) 246 247 # note parser versus parse 248 configs = parse.parse_args(args[1:]) 249 250 # load the modules, make the component 251 wrappers = [ComponentWrapper(config) for config in configs] 252 253 # make socket pairs 254 fds = make_pipes(wrappers) 255 256 reactor.running = False 257 reactor.failure = False 258 reactor.callLater(0, lambda: setattr(reactor, 'running', True)) 259 260 d = start_components(wrappers, fds) 261 262 def errback(failure): 263 log.debug('launch', log.getFailureMessage(failure)) 264 print "Error occurred: %s" % failure.getErrorMessage() 265 failure.printDetailedTraceback() 266 reactor.failure = True 267 if reactor.running: 268 print "Stopping reactor." 269 reactor.stop()
270 d.addErrback(errback) 271 272 if not reactor.failure: 273 print 'Running the reactor. Press Ctrl-C to exit.' 274 275 log.debug('launch', 'Starting reactor') 276 reactor.run() 277 278 log.debug('launch', 'Reactor stopped') 279 280 if reactor.failure: 281 return 1 282 else: 283 return 0 284