Package flumotion :: Package worker :: Module job
[hide private]

Source Code for Module flumotion.worker.job

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  worker-side objects to handle worker clients 
 24  """ 
 25   
 26  import os 
 27  import signal 
 28  import sys 
 29   
 30  from twisted.internet import defer, reactor 
 31   
 32  from flumotion.common import errors, log 
 33  from flumotion.common import common, messages 
 34   
 35  N_ = messages.N_ 
 36  T_ = messages.gettexter('flumotion') 
 37   
 38  from flumotion.worker import base 
 39   
 40   
41 -class ComponentJobAvatar(base.BaseJobAvatar):
42 - def haveMind(self):
43 def bootstrap(*args): 44 return self.mindCallRemote('bootstrap', *args)
45 46 def create(_, job): 47 self.debug("asking job to create component with avatarId %s," 48 " type %s", job.avatarId, job.type) 49 return self.mindCallRemote('create', job.avatarId, job.type, 50 job.moduleName, job.methodName, 51 job.nice, job.conf)
52 53 def success(_, avatarId): 54 self.debug('job started component with avatarId %s', 55 avatarId) 56 # FIXME: drills down too much? 57 self._heaven._startSet.createSuccess(avatarId) 58 59 def error(failure, job): 60 msg = log.getFailureMessage(failure) 61 if failure.check(errors.ComponentCreateError): 62 self.warning('could not create component %s of type %s:' 63 ' %s', job.avatarId, job.type, msg) 64 else: 65 self.warning('unhandled error creating component %s: %s', 66 job.avatarId, msg) 67 # FIXME: drills down too much? 68 self._heaven._startSet.createFailed(job.avatarId, failure) 69 70 def gotPid(pid): 71 self.pid = pid 72 info = self._heaven.getManagerConnectionInfo() 73 if info.use_ssl: 74 transport = 'ssl' 75 else: 76 transport = 'tcp' 77 job = self._heaven.getJobInfo(pid) 78 workerName = self._heaven.getWorkerName() 79 80 d = bootstrap(workerName, info.host, info.port, transport, 81 info.authenticator, job.bundles) 82 d.addCallback(create, job) 83 d.addCallback(success, job.avatarId) 84 d.addErrback(error, job) 85 return d 86 d = self.mindCallRemote("getPid") 87 d.addCallback(gotPid) 88 return d 89
90 - def stop(self):
91 """ 92 returns: a deferred marking completed stop. 93 """ 94 if not self.mind: 95 self.debug('already logged out') 96 return defer.succeed(None) 97 else: 98 self.debug('stopping') 99 return self.mindCallRemote('stop')
100
101 - def sendFeed(self, feedName, fd, eaterId):
102 """ 103 Tell the feeder to send the given feed to the given fd. 104 105 @returns: whether the fd was successfully handed off to the component. 106 """ 107 self.debug('Sending FD %d to component job to feed %s to fd', 108 fd, feedName) 109 110 # it is possible that the component has logged out, in which 111 # case we don't have a mind. Trying to check for this earlier 112 # only introduces a race, so we handle it here by triggering a 113 # disconnect on the fd. 114 if self.mind: 115 message = "sendFeed %s %s" % (feedName, eaterId) 116 return self._sendFileDescriptor(fd, message) 117 else: 118 self.debug('my mind is gone, trigger disconnect') 119 return False
120
121 - def receiveFeed(self, eaterAlias, fd, feedId):
122 """ 123 Tell the feeder to receive the given feed from the given fd. 124 125 @returns: whether the fd was successfully handed off to the component. 126 """ 127 self.debug('Sending FD %d to component job to eat %s from fd', 128 fd, eaterAlias) 129 130 # same note as in sendFeed 131 if self.mind: 132 message = "receiveFeed %s %s" % (eaterAlias, feedId) 133 return self._sendFileDescriptor(fd, message) 134 else: 135 self.debug('my mind is gone, trigger disconnect') 136 return False
137
138 - def perspective_cleanShutdown(self):
139 """ 140 This notification from the job process will be fired when it is 141 shutting down, so that although the process might still be 142 around, we know it's OK to accept new start requests for this 143 avatar ID. 144 """ 145 self.info("component %s shutting down cleanly", self.avatarId) 146 # FIXME: drills down too much? 147 self._heaven._startSet.shutdownStart(self.avatarId)
148 149
150 -class ComponentJobInfo(base.JobInfo):
151 __slots__ = ('conf',) 152
153 - def __init__(self, pid, avatarId, type, moduleName, methodName, 154 nice, bundles, conf):
158 159
160 -class ComponentJobHeaven(base.BaseJobHeaven):
161 avatarClass = ComponentJobAvatar 162
163 - def getManagerConnectionInfo(self):
164 """ 165 Gets the L{flumotion.common.connection.PBConnectionInfo} 166 describing how to connect to the manager. 167 168 @rtype: L{flumotion.common.connection.PBConnectionInfo} 169 """ 170 return self.brain.managerConnectionInfo
171
172 - def spawn(self, avatarId, type, moduleName, methodName, nice, 173 bundles, conf):
174 """ 175 Spawn a new job. 176 177 This will spawn a new flumotion-job process, running under the 178 requested nice level. When the job logs in, it will be told to 179 load bundles and run a function, which is expected to return a 180 component. 181 182 @param avatarId: avatarId the component should use to log in 183 @type avatarId: str 184 @param type: type of component to start 185 @type type: str 186 @param moduleName: name of the module to create the component from 187 @type moduleName: str 188 @param methodName: the factory method to use to create the component 189 @type methodName: str 190 @param nice: nice level 191 @type nice: int 192 @param bundles: ordered list of (bundleName, bundlePath) for this 193 component 194 @type bundles: list of (str, str) 195 @param conf: component configuration 196 @type conf: dict 197 """ 198 d = self._startSet.createStart(avatarId) 199 200 p = base.JobProcessProtocol(self, avatarId, self._startSet) 201 executable = os.path.join(os.path.dirname(sys.argv[0]), 'flumotion-job') 202 if not os.path.exists(executable): 203 self.error("Trying to spawn job process, but '%s' does not " 204 "exist", executable) 205 argv = [executable, avatarId, self._socketPath] 206 207 realexecutable = executable 208 209 # Run some jobs under valgrind, optionally. Would be nice to have the 210 # arguments to run it with configurable, but this'll do for now. 211 # FLU_VALGRIND_JOB takes a comma-seperated list of full component 212 # avatar IDs. 213 if os.environ.has_key('FLU_VALGRIND_JOB'): 214 jobnames = os.environ['FLU_VALGRIND_JOB'].split(',') 215 if avatarId in jobnames: 216 realexecutable = 'valgrind' 217 # We can't just valgrind flumotion-job, we have to valgrind 218 # python running flumotion-job, otherwise we'd need 219 # --trace-children (not quite sure why), which we don't want 220 argv = ['valgrind', '--leak-check=full', '--num-callers=24', 221 '--leak-resolution=high', '--show-reachable=yes', 222 'python'] + argv 223 224 childFDs = {0: 0, 1: 1, 2: 2} 225 env = {} 226 env.update(os.environ) 227 env['FLU_DEBUG'] = log.getDebug() 228 process = reactor.spawnProcess(p, realexecutable, env=env, args=argv, 229 childFDs=childFDs) 230 231 p.setPid(process.pid) 232 233 self.addJobInfo(process.pid, 234 ComponentJobInfo(process.pid, avatarId, type, 235 moduleName, methodName, nice, 236 bundles, conf)) 237 return d
238 239
240 -class CheckJobAvatar(base.BaseJobAvatar):
241 - def haveMind(self):
242 # FIXME: drills down too much? 243 def gotPid(pid): 244 self.pid = pid 245 job = self._heaven.getJobInfo(pid) 246 self._heaven._startSet.createSuccess(job.avatarId)
247 248 d = self.mindCallRemote("getPid") 249 d.addCallback(gotPid) 250 return d
251
252 - def stop(self):
253 """ 254 returns: a deferred marking completed stop. 255 """ 256 self._heaven._startSet.shutdownStart(self.avatarId) 257 self._heaven.killJob(self.avatarId, signal.SIGTERM)
258
259 - def perspective_cleanShutdown(self):
260 self.debug("job is stopping") 261 pass
262 263
264 -class CheckJobHeaven(base.BaseJobHeaven):
265 avatarClass = CheckJobAvatar 266 267 _checkCount = 0 268 _timeout = 45 269
270 - def __init__(self, brain):
271 base.BaseJobHeaven.__init__(self, brain) 272 273 # job processes that are available to do work (i.e. not actively 274 # running checks) 275 self.jobPool = []
276
277 - def getCheckJobFromPool(self):
278 if self.jobPool: 279 job, expireDC = self.jobPool.pop(0) 280 expireDC.cancel() 281 self.debug('running check in already-running job %s', 282 job.avatarId) 283 return defer.succeed(job) 284 285 avatarId = 'check-%d' % (self._checkCount,) 286 self._checkCount += 1 287 288 self.debug('spawning new job %s to run a check', avatarId) 289 d = self._startSet.createStart(avatarId) 290 291 p = base.JobProcessProtocol(self, avatarId, self._startSet) 292 executable = os.path.join(os.path.dirname(sys.argv[0]), 'flumotion-job') 293 argv = [executable, avatarId, self._socketPath] 294 295 childFDs = {0: 0, 1: 1, 2: 2} 296 env = {} 297 env.update(os.environ) 298 env['FLU_DEBUG'] = log.getDebug() 299 process = reactor.spawnProcess(p, executable, env=env, args=argv, 300 childFDs=childFDs) 301 302 p.setPid(process.pid) 303 jobInfo = base.JobInfo(process.pid, avatarId, type, None, None, 304 None, []) 305 self._jobInfos[process.pid] = jobInfo 306 307 def haveMind(_): 308 # we have a mind, in theory; return the job avatar 309 return self.avatars[avatarId]
310 311 d.addCallback(haveMind) 312 return d
313
314 - def runCheck(self, bundles, moduleName, methodName, *args, **kwargs):
315 def haveJob(job): 316 def callProc(_): 317 return job.mindCallRemote('runFunction', moduleName, 318 methodName, *args, **kwargs)
319 320 def timeout(sig): 321 self.killJobByPid(process.pid, sig) 322 323 def haveResult(res): 324 if not termtimeout.active(): 325 self.info("Discarding error %s", res) 326 res = messages.Result() 327 res.add(messages.Error(T_(N_("Check timed out.")), 328 debug=("Timed out running %s."%methodName))) 329 else: 330 def expire(): 331 if (job, expireDC) in self.jobPool: 332 self.debug('stopping idle check job process %s', 333 job.avatarId) 334 self.jobPool.remove((job, expireDC)) 335 job.mindCallRemote('stop') 336 expireDC = reactor.callLater(self._timeout, expire) 337 self.jobPool.append((job, expireDC)) 338 339 if termtimeout.active(): 340 termtimeout.cancel() 341 if killtimeout.active(): 342 killtimeout.cancel() 343 return res 344 345 # add callbacks and errbacks that kill the job 346 347 termtimeout = reactor.callLater(self._timeout, timeout, 348 signal.SIGTERM) 349 killtimeout = reactor.callLater(self._timeout, timeout, 350 signal.SIGKILL) 351 352 d = job.mindCallRemote('bootstrap', self.getWorkerName(), 353 None, None, None, None, bundles) 354 d.addCallback(callProc) 355 d.addCallbacks(haveResult, haveResult) 356 return d 357 358 d = self.getCheckJobFromPool() 359 d.addCallback(haveJob) 360 361 return d 362