1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 the job-side half of the worker-job connection
24 """
25
26 import os
27 import resource
28 import sys
29
30
31
32
33
34
35 from twisted.cred import credentials
36 from twisted.internet import reactor, defer
37 from twisted.python import failure
38 from twisted.spread import pb
39 from zope.interface import implements
40
41 from flumotion.common import config, errors, interfaces, log, registry, keycards
42 from flumotion.common import medium, package
43 from flumotion.common.reflectcall import createComponent, reflectCallCatching
44 from flumotion.component import component
45
46 from flumotion.twisted import fdserver
47 from flumotion.twisted import pb as fpb
48 from flumotion.twisted import defer as fdefer
49
51 """
52 I am a medium between the job and the worker's job avatar.
53 I live in the job process.
54
55 @cvar component: the component this is a medium for; created as part of
56 L{remote_create}
57 @type component: L{flumotion.component.component.BaseComponent}
58 """
59 logCategory = 'jobmedium'
60 remoteLogName = 'jobavatar'
61
62 implements(interfaces.IJobMedium)
63
65 self.avatarId = None
66 self.logName = None
67 self.component = None
68
69 self._workerName = None
70 self._managerHost = None
71 self._managerPort = None
72 self._managerTransport = None
73 self._managerKeycard = None
74 self._componentClientFactory = None
75
76 self._hasStoppedReactor = False
77
78
79 - def remote_bootstrap(self, workerName, host, port, transport, authenticator,
80 packagePaths):
81 """
82 I receive the information on how to connect to the manager. I also set
83 up package paths to be able to run the component.
84
85 Called by the worker's JobAvatar.
86
87 @param workerName: the name of the worker running this job
88 @type workerName: str
89 @param host: the host that is running the manager
90 @type host: str
91 @param port: port on which the manager is listening
92 @type port: int
93 @param transport: 'tcp' or 'ssl'
94 @type transport: str
95 @param authenticator: remote reference to the worker-side authenticator
96 @type authenticator: L{twisted.spread.pb.RemoteReference} to a
97 L{flumotion.twisted.pb.Authenticator}
98 @param packagePaths: ordered list of
99 (package name, package path) tuples
100 @type packagePaths: list of (str, str)
101 """
102 self._workerName = workerName
103 self._managerHost = host
104 self._managerPort = port
105 self._managerTransport = transport
106 if authenticator:
107 self._authenticator = fpb.RemoteAuthenticator(authenticator)
108 else:
109 self.debug('no authenticator, will not be able to log '
110 'into manager')
111 self._authenticator = None
112
113 packager = package.getPackager()
114 for name, path in packagePaths:
115 self.debug('registering package path for %s' % name)
116 self.log('... from path %s' % path)
117 packager.registerPackagePath(path, name)
118
121
123 """
124 I am called on by the worker's JobAvatar to run a function,
125 normally on behalf of the flumotion wizard.
126
127 @param moduleName: name of the module containing the function
128 @type moduleName: str
129 @param methodName: the method to run
130 @type methodName: str
131 @param args: args to pass to the method
132 @type args: tuple
133 @param kwargs: kwargs to pass to the method
134 @type kwargs: dict
135
136 @returns: the result of invoking the method
137 """
138 self.info('Running %s.%s(*%r, **%r)' % (moduleName, methodName,
139 args, kwargs))
140
141 self._enableCoreDumps()
142
143 return reflectCallCatching(errors.RemoteRunError, moduleName,
144 methodName, *args, **kwargs)
145
146 - def remote_create(self, avatarId, type, moduleName, methodName,
147 nice, conf):
148 """
149 I am called on by the worker's JobAvatar to create a component.
150
151 @param avatarId: avatarId for component to log in to manager
152 @type avatarId: str
153 @param type: type of component to start
154 @type type: str
155 @param moduleName: name of the module to create the component from
156 @type moduleName: str
157 @param methodName: the factory method to use to create the component
158 @type methodName: str
159 @param nice: the nice level
160 @type nice: int
161 @param conf: the component configuration
162 @type conf: dict
163 """
164 self.avatarId = avatarId
165 self.logName = avatarId
166
167 self.component = self._createComponent(avatarId, type, moduleName,
168 methodName, nice, conf)
169 self.component.setShutdownHook(self._componentStopped)
170
174
181
196
197
199 """
200 Shut down the job process completely, cleaning up the component
201 so the reactor can be left from.
202 """
203 if self._hasStoppedReactor:
204 self.debug("Not stopping reactor again, already shutting down")
205 else:
206 self._hasStoppedReactor = True
207 self.info("Stopping reactor in job process")
208 reactor.stop()
209
211 if not nice:
212 return
213
214 try:
215 os.nice(nice)
216 except OSError, e:
217 self.warning('Failed to set nice level: %s' % str(e))
218 else:
219 self.debug('Nice level set to %d' % nice)
220
222 soft, hard = resource.getrlimit(resource.RLIMIT_CORE)
223 if hard != resource.RLIM_INFINITY:
224 self.warning('Could not set unlimited core dump sizes, '
225 'setting to %d instead' % hard)
226 else:
227 self.debug('Enabling core dumps of unlimited size')
228
229 resource.setrlimit(resource.RLIMIT_CORE, (hard, hard))
230
231 - def _createComponent(self, avatarId, type, moduleName, methodName,
232 nice, conf):
233 """
234 Create a component of the given type.
235 Log in to the manager with the given avatarId.
236
237 @param avatarId: avatarId component will use to log in to manager
238 @type avatarId: str
239 @param type: type of component to start
240 @type type: str
241 @param moduleName: name of the module that contains the entry point
242 @type moduleName: str
243 @param methodName: name of the factory method to create the component
244 @type methodName: str
245 @param nice: the nice level to run with
246 @type nice: int
247 @param conf: the component configuration
248 @type conf: dict
249 """
250 self.info('Creating component "%s" of type "%s"', avatarId, type)
251
252 self._setNice(nice)
253 self._enableCoreDumps()
254
255 try:
256 comp = createComponent(moduleName, methodName, conf)
257 except Exception, e:
258 msg = "Exception %s during createComponent: %s" % (
259 e.__class__.__name__, " ".join(e.args))
260
261
262 if isinstance(e, errors.ComponentCreateError):
263 msg = e.args[0]
264 self.warning(
265 "raising ComponentCreateError(%s) and stopping job" % msg)
266
267
268
269
270
271
272
273 reactor.callLater(0.1, self.shutdown)
274 raise errors.ComponentCreateError(msg)
275
276 comp.setWorkerName(self._workerName)
277
278
279 self.debug('creating ComponentClientFactory')
280 managerClientFactory = component.ComponentClientFactory(comp)
281 self._componentClientFactory = managerClientFactory
282 self.debug('created ComponentClientFactory %r' % managerClientFactory)
283 self._authenticator.avatarId = avatarId
284 managerClientFactory.startLogin(self._authenticator)
285
286 host = self._managerHost
287 port = self._managerPort
288 transport = self._managerTransport
289 self.debug('logging in with authenticator %r' % self._authenticator)
290 if transport == "ssl":
291 from flumotion.common import common
292 common.assertSSLAvailable()
293 from twisted.internet import ssl
294 self.info('Connecting to manager %s:%d with SSL' % (host, port))
295 reactor.connectSSL(host, port, managerClientFactory,
296 ssl.ClientContextFactory())
297 elif transport == "tcp":
298 self.info('Connecting to manager %s:%d with TCP' % (host, port))
299 reactor.connectTCP(host, port, managerClientFactory)
300 else:
301 self.warning('Unknown transport protocol %s' % self._managerTransport)
302
303 return comp
304
306 """
307 A pb.Broker subclass that handles FDs being passed (with associated data)
308 over the same connection as the normal PB data stream.
309 When an FD is seen, the FD should be added to a given eater or feeder
310 element.
311 """
312 - def __init__(self, connectionClass, **kwargs):
313 """
314 @param connectionClass: a subclass of L{twisted.internet.tcp.Connection}
315 """
316 pb.Broker.__init__(self, **kwargs)
317
318 self._connectionClass = connectionClass
319
321
322 self.debug('received fds %r, message %r' % (fds, message))
323 if message.startswith('sendFeed '):
324 def parseargs(_, feedName, eaterId=None):
325 return feedName, eaterId
326 feedName, eaterId = parseargs(*message.split(' '))
327 self.factory.medium.component.feedToFD(feedName, fds[0],
328 os.close, eaterId)
329 elif message.startswith('receiveFeed '):
330 def parseargs2(_, eaterAlias, feedId=None):
331 return eaterAlias, feedId
332 eaterAlias, feedId = parseargs2(*message.split(' '))
333 self.factory.medium.component.eatFromFD(eaterAlias, feedId,
334 fds[0])
335 elif message == 'redirectStdout':
336 self.debug('told to rotate stdout to fd %d', fds[0])
337 os.dup2(fds[0], sys.stdout.fileno())
338 os.close(fds[0])
339 self.debug('rotated stdout')
340 elif message == 'redirectStderr':
341 self.debug('told to rotate stderr to fd %d', fds[0])
342 os.dup2(fds[0], sys.stderr.fileno())
343 os.close(fds[0])
344 self.info('rotated stderr')
345 else:
346 self.warning('Unknown message received: %r' % message)
347
349 """
350 I am a client factory that logs in to the WorkerBrain.
351 I live in the flumotion-job process spawned by the worker.
352
353 @cvar medium: the medium for the JobHeaven to access us through
354 @type medium: L{JobMedium}
355 """
356 logCategory = "job"
357 perspectiveInterface = interfaces.IJobMedium
358
372
373
378
379
380 - def login(self, username):
381 def haveReference(remoteReference):
382 self.info('Logged in to worker')
383 self.debug('perspective %r connected', remoteReference)
384 self.medium.setRemoteReference(remoteReference)
385
386 self.info('Logging in to worker')
387 d = pb.PBClientFactory.login(self,
388 credentials.UsernamePassword(username, ''),
389 self.medium)
390 d.addCallback(haveReference)
391 return d
392
393
394
395
396
401