1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 worker-side objects for components
24 """
25
26 import os
27 import time
28 import socket
29
30 from twisted.internet import reactor, error, defer
31 from twisted.spread import pb
32 from twisted.python import reflect
33 from zope.interface import implements
34
35 from flumotion.common import interfaces, errors, log, planet, medium
36 from flumotion.common import componentui, common, registry, messages
37 from flumotion.common import interfaces, reflectcall
38
39 from flumotion.common.planet import moods
40 from flumotion.configure import configure
41 from flumotion.twisted import credentials
42 from flumotion.twisted import pb as fpb
43
44 from flumotion.common.messages import N_
45 T_ = messages.gettexter('flumotion')
46
48 """
49 I am a client factory for a component logging in to the manager.
50 """
51 logCategory = 'component'
52 perspectiveInterface = interfaces.IComponentMedium
71
75
76
78 def remoteDisconnected(remoteReference):
79 if reactor.killed:
80 self.log('Connection to manager lost due to shutdown')
81 else:
82 self.warning('Lost connection to manager, '
83 'will attempt to reconnect')
84
85 def loginCallback(reference):
86 self.info("Logged in to manager")
87 self.debug("remote reference %r" % reference)
88 self._previously_connected = True
89
90 self.medium.setRemoteReference(reference)
91 reference.notifyOnDisconnect(remoteDisconnected)
92
93 def accessDeniedErrback(failure):
94 failure.trap(errors.NotAuthenticatedError)
95 self.warning('Access denied.')
96
97 def connectionRefusedErrback(failure):
98 failure.trap(error.ConnectionRefusedError)
99 self.warning('Connection to manager refused.')
100
101 def alreadyLoggedInErrback(failure):
102 failure.trap(errors.AlreadyConnectedError)
103 self.warning('Component with id %s is already logged in.',
104 self.medium.authenticator.avatarId)
105
106 def loginFailedErrback(failure):
107 self.warning('Login failed, reason: %s' % failure)
108
109 d.addCallback(loginCallback)
110 d.addErrback(accessDeniedErrback)
111 d.addErrback(connectionRefusedErrback)
112 d.addErrback(alreadyLoggedInErrback)
113 d.addErrback(loginFailedErrback)
114
115
119
121 def call_proc(_, p):
122 log.debug('', 'calling %r', p)
123 return p(*args, **kwargs)
124 p, procs = procs[0], procs[1:]
125 d = defer.maybeDeferred(call_proc, None, p)
126 for p in procs:
127 d.addCallback(call_proc, p)
128 return d
129
130
132 """
133 I am a medium interfacing with a manager-side avatar.
134 I implement a Referenceable for the manager's avatar to call on me.
135 I have a remote reference to the manager's avatar to call upon.
136 I am created by the L{ComponentClientFactory}.
137
138 @cvar authenticator: the authenticator used to log in to manager
139 @type authenticator: L{flumotion.twisted.pb.Authenticator}
140 """
141
142 implements(interfaces.IComponentMedium)
143 logCategory = 'basecompmed'
144
146 """
147 @param component: L{flumotion.component.component.BaseComponent}
148 """
149 self.comp = component
150 self.authenticator = None
151 self.broker = None
152
156
157
158 - def setup(self, config):
160
162 """
163 Return the manager IP as seen by us.
164 """
165 assert self.remote or self.broker
166 broker = self.broker or self.remote.broker
167 peer = broker.transport.getPeer()
168 try:
169 host = peer.host
170 except AttributeError:
171 host = peer[1]
172
173 res = socket.gethostbyname(host)
174 self.debug("getManagerIP(): we think the manager's IP is %r" % res)
175 return res
176
178 """
179 Return the IP of this component based on connection to the manager.
180
181 Note: this is insufficient in general, and should be replaced by
182 network mapping stuff later.
183 """
184 assert self.remote
185 host = self.remote.broker.transport.getHost()
186 self.debug("getIP(): using %r as our IP", host.host)
187 return host.host
188
190 """
191 Set the authenticator the client factory has used to log in to the
192 manager. Can be reused by the component's medium to make
193 feed connections which also get authenticated by the manager's
194 bouncer.
195
196 @type authenticator: L{flumotion.twisted.pb.Authenticator}
197 """
198 self.authenticator = authenticator
199
200
201
203 """
204 Return the state of the component, which will be serialized to a
205 L{flumotion.common.planet.ManagerJobState} object.
206
207 @rtype: L{flumotion.common.planet.WorkerJobState}
208 @returns: state of component
209 """
210
211
212 self.comp.state.set('manager-ip', self.getManagerIP())
213 return self.comp.state
214
216 """
217 Return the configuration of the component.
218
219 @rtype: dict
220 @returns: component's current configuration
221 """
222 return self.comp.config
223
225 self.info('Stopping component')
226 return self.comp.stop()
227
229 """Reload modules in the component."""
230 from flumotion.common.reload import reload as freload
231 freload()
232
234 """Get a WorkerComponentUIState containing details needed to
235 present an admin-side UI state
236 """
237 return self.comp.uiState
238
240 """
241 Base implementation of getMasterClockInfo, can be overridden by
242 subclasses. By default, just returns None.
243 """
244 return None
245
247 """
248 I am the base class for all Flumotion components.
249
250 @ivar name: the name of the component
251 @type name: string
252 @ivar medium: the component's medium
253 @type medium: L{BaseComponentMedium}
254
255 @cvar componentMediumClass: the medium class to use for this component
256 @type componentMediumClass: child class of L{BaseComponentMedium}
257 """
258
259 logCategory = 'basecomp'
260 componentMediumClass = BaseComponentMedium
261
262 - def __init__(self, config, haveError=None):
263 """
264 Subclasses should not override __init__ at all.
265
266 Instead, they should implement init(), which will be called
267 by this implementation automatically.
268
269 See L{flumotion.common.common.InitMixin} for more details.
270 """
271 self.debug("initializing %r with config %r", type(self), config)
272 self.config = config
273 self._haveError = haveError
274
275
276 common.InitMixin.__init__(self)
277
278 self.setup()
279
280
282 """
283 A subclass should do as little as possible in its init method.
284 In particular, it should not try to access resources.
285
286 Failures during init are marshalled back to the manager through
287 the worker's remote_create method, since there is no component state
288 proxied to the manager yet at the time of init.
289 """
290 self.state = planet.WorkerJobState()
291
292 self.name = self.config['name']
293
294 self.state.set('pid', os.getpid())
295 self.setMood(moods.waking)
296
297 self.medium = None
298
299 self.uiState = componentui.WorkerComponentUIState()
300 self.uiState.addKey('cpu-percent')
301
302 self.plugs = {}
303
304 self._happyWaits = []
305
306
307 self._lastTime = time.time()
308 self._lastClock = time.clock()
309 self._cpuPoller = common.Poller(self._pollCPU, 5)
310
311 self._shutdownHook = None
312
314 """
315 Subclasses can implement me to run any checks before the component
316 performs setup.
317
318 Messages can be added to the component state's 'messages' list key.
319 Any error messages added will trigger the component going to sad
320 an L{flumotion.common.errors.ComponentSetupError} being raised;
321 do_setup() will not be called.
322
323 In the event of a fatal problem that can't be expressed through an
324 error message, this method should raise an exception or return a
325 failure.
326
327 It is not necessary to chain up in this function. The return
328 value may be a deferred.
329 """
330 return defer.maybeDeferred(self.check_properties,
331 self.config['properties'],
332 self.addMessage)
333
335 """
336 BaseComponent convenience vmethod for running checks.
337
338 A component implementation can override this method to run any
339 checks that it needs to. Typically, a check_properties
340 implementation will call the provided addMessage() callback to
341 note warnings or errors. For errors, addMessage() will abort the
342 check process, setting the mood to sad.
343
344 @param properties: The component's properties
345 @type properties: dict of string => object
346 @param addMessage: Thunk to add a message to the component
347 state. Will raise an exception if the
348 message is of level ERROR.
349 @type addMessage: L{flumotion.common.messages.Message} -> None
350 """
351 pass
352
354 """
355 Subclasses can implement me to set up the component before it is
356 started. It should set up the component, possibly opening files
357 and resources.
358 Non-programming errors should not be raised, but returned as a
359 failing deferred.
360
361 The return value may be a deferred.
362 """
363 for socket, plugs in self.config['plugs'].items():
364 self.plugs[socket] = []
365 for plug in plugs:
366 instance = reflectcall.reflectCall(plug['module-name'],
367 plug['function-name'],
368 plug)
369 self.plugs[socket].append(instance)
370 self.debug('Starting plug %r on socket %s',
371 instance, socket)
372 instance.start(self)
373
374
375
376 checks = common.get_all_methods(self, 'do_check', False)
377 return maybe_deferred_chain(checks, self)
378
380 """
381 BaseComponent vmethod for stopping.
382 The component should do any cleanup it needs, but must not set the
383 component's mood to sleeping.
384
385 @Returns: L{twisted.internet.defer.Deferred}
386 """
387 for socket, plugs in self.plugs.items():
388 for plug in plugs:
389 self.debug('Stopping plug %r on socket %s', plug, socket)
390 plug.stop(self)
391
392 for message in self.state.get('messages'):
393
394 self.state.remove('messages', message)
395
396 if self._cpuPoller:
397 self._cpuPoller.stop()
398 self._cpuPoller = None
399
400 if self._shutdownHook:
401 self.debug('_stoppedCallback: firing shutdown hook')
402 self._shutdownHook()
403
404
406 """
407 Sets up the component. Called during __init__, so be sure not
408 to raise exceptions, instead adding messages to the component
409 state.
410 """
411 def run_setups():
412 setups = common.get_all_methods(self, 'do_setup', False)
413 return maybe_deferred_chain(setups, self)
414
415 def go_happy(_):
416 self.debug('setup complete, going happy')
417 self.setMood(moods.happy)
418
419 def got_error(failure):
420 if not failure.check(errors.ComponentSetupHandledError):
421 txt = log.getFailureMessage(failure)
422 self.warning('Setup failed: %s', txt)
423 m = messages.Error(T_(N_("Could not setup component.")),
424 debug=txt,
425 id="component-setup-%s" % self.name)
426
427 self.addMessage(m)
428
429 return None
430
431 self.setMood(moods.waking)
432
433 d = run_setups()
434 d.addCallbacks(go_happy, got_error)
435
436
438 """
439 Set the shutdown hook for this component (replacing any previous hook).
440 When a component is stopped, then this hook will be fired.
441 """
442 self._shutdownHook = shutdownHook
443
445 """
446 Tell the component to stop.
447 The connection to the manager will be closed.
448 The job process will also finish.
449 """
450 self.debug('BaseComponent.stop')
451
452
453 self.setMood(moods.waking)
454
455
456 stops = common.get_all_methods(self, 'do_stop', True)
457 return maybe_deferred_chain(stops, self)
458
459
462
464 self.state.set('workerName', workerName)
465
468
473
475 """
476 Set the given mood on the component if it's different from the current
477 one.
478 """
479 current = self.state.get('mood')
480
481 if current == mood.value:
482 self.log('already in mood %r' % mood)
483 return
484 elif current == moods.sad.value:
485 self.info('tried to set mood to %r, but already sad :-(' % mood)
486 return
487
488 self.doLog(log.DEBUG, -2, 'MOOD changed to %r by caller', mood)
489 self.state.set('mood', mood.value)
490
491 if mood == moods.happy:
492 while self._happyWaits:
493 self._happyWaits.pop(0).callback(None)
494 elif mood == moods.sad:
495 while self._happyWaits:
496 self._happyWaits.pop(0).errback(errors.ComponentStartError())
497
499 """
500 Gets the mood on the component.
501
502 @rtype: int
503 """
504 return self.state.get('mood')
505
516
518 """
519 Add a message to the component.
520 If any of the messages is an error, the component will turn sad.
521
522 @type message: L{flumotion.common.messages.Message}
523 """
524 self.state.append('messages', message)
525 if message.level == messages.ERROR:
526 self.debug('error message, turning sad')
527 self.setMood(moods.sad)
528 if self._haveError:
529 self._haveError(message)
530
532 """
533 Fix properties that have been renamed from a previous version,
534 and add a warning for them.
535
536 @param properties: properties; will be modified as a result.
537 @type properties: dict
538 @param list: list of (old, new) tuples of property names.
539 @type list: list of tuple of (str, str)
540 """
541 found = []
542 for old, new in list:
543 if properties.has_key(old):
544 found.append((old, new))
545
546 if found:
547 m = messages.Warning(T_(N_(
548 "Your configuration uses deprecated properties. "
549 "Please update your configuration and correct them.\n")),
550 id = "deprecated")
551 for old, new in found:
552 m.add(T_(N_(
553 "Please rename '%s' to '%s'.\n"),
554 old, new))
555 self.debug("Setting new property '%s' to %r", new,
556 properties[old])
557 properties[new] = properties[old]
558 del properties[old]
559 self.addMessage(m)
560
562 """
563 Call a remote method on all admin client views on this component.
564
565 This gets serialized through the manager and multiplexed to all
566 admin clients, and from there on to all views connected to each
567 admin client model.
568
569 Because there can be any number of admin clients that this call
570 will go out do, it does not make sense to have one return value.
571 This function will return None always.
572 """
573 if self.medium:
574 self.medium.callRemote("adminCallRemote", methodName,
575 *args, **kwargs)
576 else:
577 self.debug('asked to adminCallRemote(%s, *%r, **%r), but '
578 'no manager.'
579 % (methodName, args, kwargs))
580
582
583 nowTime = time.time()
584 nowClock = time.clock()
585 deltaTime = nowTime - self._lastTime
586 deltaClock = nowClock - self._lastClock
587 if deltaClock <= 0:
588
589 return
590 CPU = deltaClock/deltaTime
591 self.log('latest CPU use: %r', CPU)
592 self.uiState.set('cpu-percent', CPU)
593 self._lastTime = nowTime
594 self._lastClock = nowClock
595