1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 """
24 Flumotion-launch: A gst-launch analog for Flumotion.
25
26 The goal of flumotion-launch is to provide an easy way for testing
27 flumotion components, without involving much of Flumotion's core code.
28
29 Flumotion-launch takes a terse gst-launch-like syntax, translates that
30 into a component graph, and starts the components. An example would be::
31
32 flumotion-launch videotest ! theora-encoder ! ogg-muxer ! http-streamer
33
34 You can also set properties::
35
36 flumotion-launch videotest framerate=15/2
37
38 You can link specific feeders as well::
39
40 flumotion-launch firewire .audio ! vorbis-encoder
41 flumotion-launch firewire firewire0.audio ! vorbis-encoder
42
43 Components can be backreferenced using their names::
44
45 flumotion-launch videotest audiotest videotest0. ! ogg-muxer \
46 audiotest0. ! ogg-muxer0.
47
48 In addition, components can have plugs::
49
50 flumotion-launch http-streamer /apachelogger,logfile=/dev/stdout
51
52 Flumotion-launch explicitly avoids much of Flumotion's core logic. It
53 does not import flumotion.manager, flumotion.admin, or flumotion.worker.
54 There is no depgraph, no feed server, no job process. Although it might
55 be useful in the future to add a way to use the standard interfaces to
56 start components via admin, manager, worker, and job instances, this
57 low-level interface is useful in debugging problems and should be kept.
58 """
59
60
61 import os
62 import sys
63
64 from twisted.python import reflect
65 from twisted.internet import reactor, defer
66
67 from flumotion.common import log, common, registry, errors, messages
68 from flumotion.twisted import flavors
69 from flumotion.common.options import OptionParser
70
71 from flumotion.launch import parse
72
73 from gettext import gettext as _
74
75 _headings = {
76 messages.ERROR: _('Error'),
77 messages.WARNING: _('Warning'),
78 messages.INFO: _('Note')
79 }
80
81
83 sys.stderr.write(x + '\n')
84 raise SystemExit(1)
85
86
124
129
132
135
137 self.debug('feedToFD(feedName=%s, %d)' % (feedName, fd))
138 return self.component.feedToFD(feedName, fd, os.close)
139
140 - def eatFromFD(self, eaterAlias, feedId, fd):
144
146 fds = {}
147 wrappersByName = dict([(wrapper.name, wrapper)
148 for wrapper in wrappers])
149 def starter(wrapper, feedName, write):
150 return lambda: wrapper.feedToFD(feedName, write)
151 for wrapper in wrappers:
152 eaters = wrapper.config.get('eater', {})
153 for eaterName in eaters:
154 for feedId, eaterAlias in eaters[eaterName]:
155 compName, feederName = common.parseFeedId(feedId)
156 read, write = os.pipe()
157 log.debug('launch', '%s: read from fd %d, write to fd %d',
158 feedId, read, write)
159 start = starter(wrappersByName[compName], feederName, write)
160 fds[feedId] = (read, start)
161 return fds
162
164
165
166 def provide_clock():
167
168 need_sync = [x for x in wrappers if x.config['clock-master']]
169
170 if need_sync:
171 master = None
172 for x in need_sync:
173 if x.config['clock-master'] == x.config['avatarId']:
174 master = x
175 break
176 assert master
177 need_sync.remove(master)
178 d = master.provideMasterClock(7600 - 1)
179 def addNeedSync(clocking):
180 return need_sync, clocking
181 d.addCallback(addNeedSync)
182 return d
183 else:
184 return defer.succeed((None, None))
185
186 def do_start(synchronization, wrapper):
187 need_sync, clocking = synchronization
188
189
190 eaters = wrapper.config.get('eater', {})
191 for eaterName in eaters:
192 for feedId, eaterAlias in eaters[eaterName]:
193 read, start = fds[feedId]
194 wrapper.eatFromFD(eaterAlias, feedId, read)
195 start()
196 if (not need_sync) or (wrapper not in need_sync) or (not clocking):
197 clocking = None
198 if clocking:
199 wrapper.set_master_clock(*clocking)
200 return synchronization
201
202 def do_stop(failure):
203 for wrapper in wrappers:
204 wrapper.stop()
205 return failure
206
207 for wrapper in wrappers:
208 if not wrapper.instantiate():
209 return defer.fail(errors.ComponentStartError)
210 d = provide_clock()
211 for wrapper in wrappers:
212 d.addCallback(do_start, wrapper)
213 d.addErrback(do_stop)
214 return d
215
217 from flumotion.common import setup
218 setup.setupPackagePath()
219 from flumotion.configure import configure
220 log.debug('launch', 'Running Flumotion version %s' %
221 configure.version)
222 import twisted.copyright
223 log.debug('launch', 'Running against Twisted version %s' %
224 twisted.copyright.version)
225 from flumotion.project import project
226 for p in project.list():
227 log.debug('launch', 'Registered project %s version %s' % (
228 p, project.get(p, 'version')))
229
230 parser = OptionParser(domain="flumotion-launch")
231
232 log.debug('launch', 'Parsing arguments (%r)' % ', '.join(args))
233 options, args = parser.parse_args(args)
234
235
236 if options.verbose:
237 log.setFluDebug("*:3")
238
239
240 if options.version:
241 print common.version("flumotion-launch")
242 return 0
243
244 if options.debug:
245 log.setFluDebug(options.debug)
246
247
248 configs = parse.parse_args(args[1:])
249
250
251 wrappers = [ComponentWrapper(config) for config in configs]
252
253
254 fds = make_pipes(wrappers)
255
256 reactor.running = False
257 reactor.failure = False
258 reactor.callLater(0, lambda: setattr(reactor, 'running', True))
259
260 d = start_components(wrappers, fds)
261
262 def errback(failure):
263 log.debug('launch', log.getFailureMessage(failure))
264 print "Error occurred: %s" % failure.getErrorMessage()
265 failure.printDetailedTraceback()
266 reactor.failure = True
267 if reactor.running:
268 print "Stopping reactor."
269 reactor.stop()
270 d.addErrback(errback)
271
272 if not reactor.failure:
273 print 'Running the reactor. Press Ctrl-C to exit.'
274
275 log.debug('launch', 'Starting reactor')
276 reactor.run()
277
278 log.debug('launch', 'Reactor stopped')
279
280 if reactor.failure:
281 return 1
282 else:
283 return 0
284