1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 worker-side objects to handle worker clients
24 """
25
26 import signal
27
28 from twisted.cred import portal
29 from twisted.internet import defer, reactor
30 from twisted.spread import pb
31 from twisted.internet import error
32 from zope.interface import implements
33
34 from flumotion.common import errors, interfaces, log, bundleclient
35 from flumotion.common import common, medium, messages, worker
36 from flumotion.twisted import checkers, fdserver
37 from flumotion.twisted import pb as fpb
38 from flumotion.twisted import defer as fdefer
39 from flumotion.configure import configure
40 from flumotion.worker import medium, job, feedserver
41
43 logCategory = "proxybouncer"
44
45 """
46 I am a bouncer that proxies authenticate calls to a remote FPB root
47 object.
48 """
50 """
51 @param remote: an object that has .callRemote()
52 """
53 self._remote = remote
54
56 """
57 Call me before asking me to authenticate, so I know what I can
58 authenticate.
59 """
60 return self._remote.callRemote('getKeycardClasses')
61
66
67
69 """
70 I am the main object in the worker process, managing jobs and everything
71 related.
72 I live in the main worker process.
73
74 @ivar authenticator: authenticator worker used to log in to manager
75 @type authenticator L{flumotion.twisted.pb.Authenticator}
76 @ivar medium:
77 @type medium: L{WorkerMedium}
78 @ivar jobHeaven:
79 @type jobHeaven: L{ComponentJobHeaven}
80 @ivar checkHeaven:
81 @type checkHeaven: L{CheckJobHeaven}
82 @ivar workerClientFactory:
83 @type workerClientFactory: L{WorkerClientFactory}
84 @ivar feedServerPort: TCP port the Feed Server is listening on
85 @type feedServerPort: int
86 """
87
88 implements(interfaces.IFeedServerParent)
89
90 logCategory = 'workerbrain'
91
123
125 def sighup(signum, frame):
126 if self._oldHUPHandler:
127 self.log('got SIGHUP, calling previous handler %r',
128 self._oldHUPHandler)
129 self._oldHUPHandler(signum, frame)
130 self.debug('telling kids about new log file descriptors')
131 self.jobHeaven.rotateChildLogFDs()
132
133 handler = signal.signal(signal.SIGHUP, sighup)
134 if handler == signal.SIG_DFL or handler == signal.SIG_IGN:
135 self._oldHUPHandler = None
136 else:
137 self._oldHUPHandler = handler
138
140 """
141 Start listening on FeedServer (incoming eater requests) and
142 JobServer (through which we communicate with our children) ports
143
144 @returns: True if we successfully listened on both ports
145 """
146
147 try:
148 self.feedServer = self._makeFeedServer()
149 except error.CannotListenError, e:
150 self.warning("Failed to listen on feed server port: %r", e)
151 return False
152
153 try:
154 self.jobHeaven.listen()
155 except error.CannotListenError, e:
156 self.warning("Failed to listen on job server port: %r", e)
157 return False
158
159 try:
160 self.checkHeaven.listen()
161 except error.CannotListenError, e:
162 self.warning("Failed to listen on check server port: %r", e)
163 return False
164
165 return True
166
168 """
169 @returns: L{flumotion.worker.feedserver.FeedServer}
170 """
171 port = None
172 if self.options.randomFeederports:
173 port = 0
174 elif not self.options.feederports:
175 self.info('Not starting feed server because no port is '
176 'configured')
177 return None
178 else:
179 port = self.options.feederports[-1]
180
181 return feedserver.FeedServer(self, ProxyBouncer(self), port)
182
183 - def login(self, managerConnectionInfo):
184 self.managerConnectionInfo = managerConnectionInfo
185 self.medium.startConnecting(managerConnectionInfo)
186
187 - def callRemote(self, methodName, *args, **kwargs):
189
191 if self.stopping:
192 self.warning("Already shutting down, ignoring shutdown request")
193 return
194
195 self.info("Reactor shutting down, stopping jobHeaven")
196 self.stopping = True
197
198 l = [self.jobHeaven.shutdown(), self.checkHeaven.shutdown()]
199 if self.feedServer:
200 l.append(self.feedServer.shutdown())
201
202 return fdefer.defer_call_later(defer.DeferredList(l))
203
204
205 - def feedToFD(self, componentId, feedName, fd, eaterId):
206 """
207 Called from the FeedAvatar to pass a file descriptor on to
208 the job running the component for this feeder.
209
210 @returns: whether the fd was successfully handed off to the component.
211 """
212 if componentId not in self.jobHeaven.avatars:
213 self.warning("No such component %s running", componentId)
214 return False
215
216 avatar = self.jobHeaven.avatars[componentId]
217 return avatar.sendFeed(feedName, fd, eaterId)
218
219 - def eatFromFD(self, componentId, eaterAlias, fd, feedId):
220 """
221 Called from the FeedAvatar to pass a file descriptor on to
222 the job running the given component.
223
224 @returns: whether the fd was successfully handed off to the component.
225 """
226 if componentId not in self.jobHeaven.avatars:
227 self.warning("No such component %s running", componentId)
228 return False
229
230 avatar = self.jobHeaven.avatars[componentId]
231 return avatar.receiveFeed(eaterAlias, fd, feedId)
232
233
235 return self.ports, self.options.randomFeederports
236
238 if self.feedServer:
239 return self.feedServer.getPortNum()
240 else:
241 return None
242
243 - def create(self, avatarId, type, moduleName, methodName, nice,
244 conf):
254
255 def spawnJob(bundles):
256 return self.jobHeaven.spawn(avatarId, type, moduleName,
257 methodName, nice, bundles, conf)
258
259 def createError(failure):
260 failure.trap(errors.ComponentCreateError)
261 self.debug('create deferred for %s failed, forwarding error',
262 avatarId)
263 return failure
264
265 def success(res):
266 self.debug('create deferred for %s succeeded (%r)',
267 avatarId, res)
268 return res
269
270 self.info('Starting component "%s" of type "%s"', avatarId,
271 type)
272 d = getBundles()
273 d.addCallback(spawnJob)
274 d.addCallback(success)
275 d.addErrback(createError)
276 return d
277
278 - def runCheck(self, module, function, *args, **kwargs):
282
283 def runCheck(bundles):
284 return self.checkHeaven.runCheck(bundles, module, function,
285 *args, **kwargs)
286
287 d = getBundles()
288 d.addCallback(runCheck)
289 return d
290
293
294 - def killJob(self, avatarId, signum):
296