1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 worker-side objects to handle worker clients
24 """
25
26 import os
27 import signal
28 import sys
29
30 from twisted.internet import defer, reactor
31
32 from flumotion.common import errors, log
33 from flumotion.common import common, messages
34
35 N_ = messages.N_
36 T_ = messages.gettexter('flumotion')
37
38 from flumotion.worker import base
39
40
43 def bootstrap(*args):
44 return self.mindCallRemote('bootstrap', *args)
45
46 def create(_, job):
47 self.debug("asking job to create component with avatarId %s,"
48 " type %s", job.avatarId, job.type)
49 return self.mindCallRemote('create', job.avatarId, job.type,
50 job.moduleName, job.methodName,
51 job.nice, job.conf)
52
53 def success(_, avatarId):
54 self.debug('job started component with avatarId %s',
55 avatarId)
56
57 self._heaven._startSet.createSuccess(avatarId)
58
59 def error(failure, job):
60 msg = log.getFailureMessage(failure)
61 if failure.check(errors.ComponentCreateError):
62 self.warning('could not create component %s of type %s:'
63 ' %s', job.avatarId, job.type, msg)
64 else:
65 self.warning('unhandled error creating component %s: %s',
66 job.avatarId, msg)
67
68 self._heaven._startSet.createFailed(job.avatarId, failure)
69
70 def gotPid(pid):
71 self.pid = pid
72 info = self._heaven.getManagerConnectionInfo()
73 if info.use_ssl:
74 transport = 'ssl'
75 else:
76 transport = 'tcp'
77 job = self._heaven.getJobInfo(pid)
78 workerName = self._heaven.getWorkerName()
79
80 d = bootstrap(workerName, info.host, info.port, transport,
81 info.authenticator, job.bundles)
82 d.addCallback(create, job)
83 d.addCallback(success, job.avatarId)
84 d.addErrback(error, job)
85 return d
86 d = self.mindCallRemote("getPid")
87 d.addCallback(gotPid)
88 return d
89
91 """
92 returns: a deferred marking completed stop.
93 """
94 if not self.mind:
95 self.debug('already logged out')
96 return defer.succeed(None)
97 else:
98 self.debug('stopping')
99 return self.mindCallRemote('stop')
100
101 - def sendFeed(self, feedName, fd, eaterId):
102 """
103 Tell the feeder to send the given feed to the given fd.
104
105 @returns: whether the fd was successfully handed off to the component.
106 """
107 self.debug('Sending FD %d to component job to feed %s to fd',
108 fd, feedName)
109
110
111
112
113
114 if self.mind:
115 message = "sendFeed %s %s" % (feedName, eaterId)
116 return self._sendFileDescriptor(fd, message)
117 else:
118 self.debug('my mind is gone, trigger disconnect')
119 return False
120
122 """
123 Tell the feeder to receive the given feed from the given fd.
124
125 @returns: whether the fd was successfully handed off to the component.
126 """
127 self.debug('Sending FD %d to component job to eat %s from fd',
128 fd, eaterAlias)
129
130
131 if self.mind:
132 message = "receiveFeed %s %s" % (eaterAlias, feedId)
133 return self._sendFileDescriptor(fd, message)
134 else:
135 self.debug('my mind is gone, trigger disconnect')
136 return False
137
139 """
140 This notification from the job process will be fired when it is
141 shutting down, so that although the process might still be
142 around, we know it's OK to accept new start requests for this
143 avatar ID.
144 """
145 self.info("component %s shutting down cleanly", self.avatarId)
146
147 self._heaven._startSet.shutdownStart(self.avatarId)
148
149
151 __slots__ = ('conf',)
152
153 - def __init__(self, pid, avatarId, type, moduleName, methodName,
154 nice, bundles, conf):
158
159
161 avatarClass = ComponentJobAvatar
162
164 """
165 Gets the L{flumotion.common.connection.PBConnectionInfo}
166 describing how to connect to the manager.
167
168 @rtype: L{flumotion.common.connection.PBConnectionInfo}
169 """
170 return self.brain.managerConnectionInfo
171
172 - def spawn(self, avatarId, type, moduleName, methodName, nice,
173 bundles, conf):
174 """
175 Spawn a new job.
176
177 This will spawn a new flumotion-job process, running under the
178 requested nice level. When the job logs in, it will be told to
179 load bundles and run a function, which is expected to return a
180 component.
181
182 @param avatarId: avatarId the component should use to log in
183 @type avatarId: str
184 @param type: type of component to start
185 @type type: str
186 @param moduleName: name of the module to create the component from
187 @type moduleName: str
188 @param methodName: the factory method to use to create the component
189 @type methodName: str
190 @param nice: nice level
191 @type nice: int
192 @param bundles: ordered list of (bundleName, bundlePath) for this
193 component
194 @type bundles: list of (str, str)
195 @param conf: component configuration
196 @type conf: dict
197 """
198 d = self._startSet.createStart(avatarId)
199
200 p = base.JobProcessProtocol(self, avatarId, self._startSet)
201 executable = os.path.join(os.path.dirname(sys.argv[0]), 'flumotion-job')
202 if not os.path.exists(executable):
203 self.error("Trying to spawn job process, but '%s' does not "
204 "exist", executable)
205 argv = [executable, avatarId, self._socketPath]
206
207 realexecutable = executable
208
209
210
211
212
213 if os.environ.has_key('FLU_VALGRIND_JOB'):
214 jobnames = os.environ['FLU_VALGRIND_JOB'].split(',')
215 if avatarId in jobnames:
216 realexecutable = 'valgrind'
217
218
219
220 argv = ['valgrind', '--leak-check=full', '--num-callers=24',
221 '--leak-resolution=high', '--show-reachable=yes',
222 'python'] + argv
223
224 childFDs = {0: 0, 1: 1, 2: 2}
225 env = {}
226 env.update(os.environ)
227 env['FLU_DEBUG'] = log.getDebug()
228 process = reactor.spawnProcess(p, realexecutable, env=env, args=argv,
229 childFDs=childFDs)
230
231 p.setPid(process.pid)
232
233 self.addJobInfo(process.pid,
234 ComponentJobInfo(process.pid, avatarId, type,
235 moduleName, methodName, nice,
236 bundles, conf))
237 return d
238
239
247
248 d = self.mindCallRemote("getPid")
249 d.addCallback(gotPid)
250 return d
251
258
260 self.debug("job is stopping")
261 pass
262
263
265 avatarClass = CheckJobAvatar
266
267 _checkCount = 0
268 _timeout = 45
269
276
278 if self.jobPool:
279 job, expireDC = self.jobPool.pop(0)
280 expireDC.cancel()
281 self.debug('running check in already-running job %s',
282 job.avatarId)
283 return defer.succeed(job)
284
285 avatarId = 'check-%d' % (self._checkCount,)
286 self._checkCount += 1
287
288 self.debug('spawning new job %s to run a check', avatarId)
289 d = self._startSet.createStart(avatarId)
290
291 p = base.JobProcessProtocol(self, avatarId, self._startSet)
292 executable = os.path.join(os.path.dirname(sys.argv[0]), 'flumotion-job')
293 argv = [executable, avatarId, self._socketPath]
294
295 childFDs = {0: 0, 1: 1, 2: 2}
296 env = {}
297 env.update(os.environ)
298 env['FLU_DEBUG'] = log.getDebug()
299 process = reactor.spawnProcess(p, executable, env=env, args=argv,
300 childFDs=childFDs)
301
302 p.setPid(process.pid)
303 jobInfo = base.JobInfo(process.pid, avatarId, type, None, None,
304 None, [])
305 self._jobInfos[process.pid] = jobInfo
306
307 def haveMind(_):
308
309 return self.avatars[avatarId]
310
311 d.addCallback(haveMind)
312 return d
313
314 - def runCheck(self, bundles, moduleName, methodName, *args, **kwargs):
319
320 def timeout(sig):
321 self.killJobByPid(process.pid, sig)
322
323 def haveResult(res):
324 if not termtimeout.active():
325 self.info("Discarding error %s", res)
326 res = messages.Result()
327 res.add(messages.Error(T_(N_("Check timed out.")),
328 debug=("Timed out running %s."%methodName)))
329 else:
330 def expire():
331 if (job, expireDC) in self.jobPool:
332 self.debug('stopping idle check job process %s',
333 job.avatarId)
334 self.jobPool.remove((job, expireDC))
335 job.mindCallRemote('stop')
336 expireDC = reactor.callLater(self._timeout, expire)
337 self.jobPool.append((job, expireDC))
338
339 if termtimeout.active():
340 termtimeout.cancel()
341 if killtimeout.active():
342 killtimeout.cancel()
343 return res
344
345
346
347 termtimeout = reactor.callLater(self._timeout, timeout,
348 signal.SIGTERM)
349 killtimeout = reactor.callLater(self._timeout, timeout,
350 signal.SIGKILL)
351
352 d = job.mindCallRemote('bootstrap', self.getWorkerName(),
353 None, None, None, None, bundles)
354 d.addCallback(callProc)
355 d.addCallbacks(haveResult, haveResult)
356 return d
357
358 d = self.getCheckJobFromPool()
359 d.addCallback(haveJob)
360
361 return d
362