Package flumotion :: Package worker :: Module worker
[hide private]

Source Code for Module flumotion.worker.worker

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  worker-side objects to handle worker clients 
 24  """ 
 25   
 26  import signal 
 27   
 28  from twisted.cred import portal 
 29  from twisted.internet import defer, reactor 
 30  from twisted.spread import pb 
 31  from twisted.internet import error 
 32  from zope.interface import implements 
 33   
 34  from flumotion.common import errors, interfaces, log, bundleclient 
 35  from flumotion.common import common, medium, messages, worker 
 36  from flumotion.twisted import checkers, fdserver 
 37  from flumotion.twisted import pb as fpb 
 38  from flumotion.twisted import defer as fdefer 
 39  from flumotion.configure import configure 
 40  from flumotion.worker import medium, job, feedserver 
 41   
42 -class ProxyBouncer(log.Loggable):
43 logCategory = "proxybouncer" 44 45 """ 46 I am a bouncer that proxies authenticate calls to a remote FPB root 47 object. 48 """
49 - def __init__(self, remote):
50 """ 51 @param remote: an object that has .callRemote() 52 """ 53 self._remote = remote
54
55 - def getKeycardClasses(self):
56 """ 57 Call me before asking me to authenticate, so I know what I can 58 authenticate. 59 """ 60 return self._remote.callRemote('getKeycardClasses')
61
62 - def authenticate(self, keycard):
63 self.debug("Authenticating keycard %r against remote bouncer", 64 keycard) 65 return self._remote.callRemote('authenticate', None, keycard)
66 67 # Similar to Vishnu, but for worker related classes
68 -class WorkerBrain(log.Loggable):
69 """ 70 I am the main object in the worker process, managing jobs and everything 71 related. 72 I live in the main worker process. 73 74 @ivar authenticator: authenticator worker used to log in to manager 75 @type authenticator L{flumotion.twisted.pb.Authenticator} 76 @ivar medium: 77 @type medium: L{WorkerMedium} 78 @ivar jobHeaven: 79 @type jobHeaven: L{ComponentJobHeaven} 80 @ivar checkHeaven: 81 @type checkHeaven: L{CheckJobHeaven} 82 @ivar workerClientFactory: 83 @type workerClientFactory: L{WorkerClientFactory} 84 @ivar feedServerPort: TCP port the Feed Server is listening on 85 @type feedServerPort: int 86 """ 87 88 implements(interfaces.IFeedServerParent) 89 90 logCategory = 'workerbrain' 91
92 - def __init__(self, options):
93 """ 94 @param options: the optparsed dictionary of command-line options 95 @type options: an object with attributes 96 """ 97 self.options = options 98 self.workerName = options.name 99 100 # the last port is reserved for our FeedServer 101 if not self.options.randomFeederports: 102 self.ports = self.options.feederports[:-1] 103 else: 104 self.ports = [] 105 106 self.medium = medium.WorkerMedium(self) 107 108 # really should be componentJobHeaven, but this is shorter :) 109 self.jobHeaven = job.ComponentJobHeaven(self) 110 # for ephemeral checks 111 self.checkHeaven = job.CheckJobHeaven(self) 112 113 self.managerConnectionInfo = None 114 115 # it's possible we don't have a feed server, if we are 116 # configured to have 0 tcp ports; setup this in listen() 117 self.feedServer = None 118 119 self.stopping = False 120 reactor.addSystemEventTrigger('before', 'shutdown', 121 self.shutdownHandler) 122 self._installHUPHandler()
123
124 - def _installHUPHandler(self):
125 def sighup(signum, frame): 126 if self._oldHUPHandler: 127 self.log('got SIGHUP, calling previous handler %r', 128 self._oldHUPHandler) 129 self._oldHUPHandler(signum, frame) 130 self.debug('telling kids about new log file descriptors') 131 self.jobHeaven.rotateChildLogFDs()
132 133 handler = signal.signal(signal.SIGHUP, sighup) 134 if handler == signal.SIG_DFL or handler == signal.SIG_IGN: 135 self._oldHUPHandler = None 136 else: 137 self._oldHUPHandler = handler
138
139 - def listen(self):
140 """ 141 Start listening on FeedServer (incoming eater requests) and 142 JobServer (through which we communicate with our children) ports 143 144 @returns: True if we successfully listened on both ports 145 """ 146 # set up feed server if we have the feederports for it 147 try: 148 self.feedServer = self._makeFeedServer() 149 except error.CannotListenError, e: 150 self.warning("Failed to listen on feed server port: %r", e) 151 return False 152 153 try: 154 self.jobHeaven.listen() 155 except error.CannotListenError, e: 156 self.warning("Failed to listen on job server port: %r", e) 157 return False 158 159 try: 160 self.checkHeaven.listen() 161 except error.CannotListenError, e: 162 self.warning("Failed to listen on check server port: %r", e) 163 return False 164 165 return True
166
167 - def _makeFeedServer(self):
168 """ 169 @returns: L{flumotion.worker.feedserver.FeedServer} 170 """ 171 port = None 172 if self.options.randomFeederports: 173 port = 0 174 elif not self.options.feederports: 175 self.info('Not starting feed server because no port is ' 176 'configured') 177 return None 178 else: 179 port = self.options.feederports[-1] 180 181 return feedserver.FeedServer(self, ProxyBouncer(self), port)
182
183 - def login(self, managerConnectionInfo):
184 self.managerConnectionInfo = managerConnectionInfo 185 self.medium.startConnecting(managerConnectionInfo)
186
187 - def callRemote(self, methodName, *args, **kwargs):
188 return self.medium.callRemote(methodName, *args, **kwargs)
189
190 - def shutdownHandler(self):
191 if self.stopping: 192 self.warning("Already shutting down, ignoring shutdown request") 193 return 194 195 self.info("Reactor shutting down, stopping jobHeaven") 196 self.stopping = True 197 198 l = [self.jobHeaven.shutdown(), self.checkHeaven.shutdown()] 199 if self.feedServer: 200 l.append(self.feedServer.shutdown()) 201 # Don't fire this other than from a callLater 202 return fdefer.defer_call_later(defer.DeferredList(l))
203 204 ### These methods called by feed server
205 - def feedToFD(self, componentId, feedName, fd, eaterId):
206 """ 207 Called from the FeedAvatar to pass a file descriptor on to 208 the job running the component for this feeder. 209 210 @returns: whether the fd was successfully handed off to the component. 211 """ 212 if componentId not in self.jobHeaven.avatars: 213 self.warning("No such component %s running", componentId) 214 return False 215 216 avatar = self.jobHeaven.avatars[componentId] 217 return avatar.sendFeed(feedName, fd, eaterId)
218
219 - def eatFromFD(self, componentId, eaterAlias, fd, feedId):
220 """ 221 Called from the FeedAvatar to pass a file descriptor on to 222 the job running the given component. 223 224 @returns: whether the fd was successfully handed off to the component. 225 """ 226 if componentId not in self.jobHeaven.avatars: 227 self.warning("No such component %s running", componentId) 228 return False 229 230 avatar = self.jobHeaven.avatars[componentId] 231 return avatar.receiveFeed(eaterAlias, fd, feedId)
232 233 ### these methods called by WorkerMedium
234 - def getPorts(self):
235 return self.ports, self.options.randomFeederports
236
237 - def getFeedServerPort(self):
238 if self.feedServer: 239 return self.feedServer.getPortNum() 240 else: 241 return None
242
243 - def create(self, avatarId, type, moduleName, methodName, nice, 244 conf):
245 def getBundles(): 246 # set up bundles as we need to have a pb connection to 247 # download the modules -- can't do that in the kid yet. 248 moduleNames = [moduleName] 249 for plugs in conf.get('plugs', {}).values(): 250 for plug in plugs: 251 moduleNames.append(plug['module-name']) 252 self.debug('setting up bundles for %r', moduleNames) 253 return self.medium.bundleLoader.getBundles(moduleName=moduleNames)
254 255 def spawnJob(bundles): 256 return self.jobHeaven.spawn(avatarId, type, moduleName, 257 methodName, nice, bundles, conf) 258 259 def createError(failure): 260 failure.trap(errors.ComponentCreateError) 261 self.debug('create deferred for %s failed, forwarding error', 262 avatarId) 263 return failure 264 265 def success(res): 266 self.debug('create deferred for %s succeeded (%r)', 267 avatarId, res) 268 return res 269 270 self.info('Starting component "%s" of type "%s"', avatarId, 271 type) 272 d = getBundles() 273 d.addCallback(spawnJob) 274 d.addCallback(success) 275 d.addErrback(createError) 276 return d 277
278 - def runCheck(self, module, function, *args, **kwargs):
279 def getBundles(): 280 self.debug('setting up bundles for %s', module) 281 return self.medium.bundleLoader.getBundles(moduleName=module)
282 283 def runCheck(bundles): 284 return self.checkHeaven.runCheck(bundles, module, function, 285 *args, **kwargs) 286 287 d = getBundles() 288 d.addCallback(runCheck) 289 return d 290
291 - def getComponents(self):
292 return [job.avatarId for job in self.jobHeaven.getJobInfos()]
293
294 - def killJob(self, avatarId, signum):
295 self.jobHeaven.killJob(avatarId, signum)
296