1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 manager-side objects for components
24
25 API Stability: semi-stable
26 """
27
28 import time
29
30 from twisted.spread import pb
31 from twisted.internet import reactor, defer
32 from twisted.internet import error as terror
33 from twisted.python.failure import Failure
34 from zope.interface import implements
35
36 from flumotion.configure import configure
37 from flumotion.manager import base
38 from flumotion.common import errors, interfaces, keycards, log, config, planet
39 from flumotion.common import messages, common
40 from flumotion.twisted import flavors
41 from flumotion.common.planet import moods
42
43 from flumotion.common.messages import N_
44 T_ = messages.gettexter('flumotion')
45
47 """
48 I am a Manager-side avatar for a component.
49 I live in the L{ComponentHeaven}.
50
51 Each component that logs in to the manager gets an avatar created for it
52 in the manager.
53
54 @cvar avatarId: the L{componentId<common.componentId>}
55 @type avatarId: str
56 @cvar jobState: job state of this avatar's component
57 @type jobState: L{flumotion.common.planet.ManagerJobState}
58 @cvar componentState: component state of this avatar's component
59 @type componentState: L{flumotion.common.planet.ManagerComponentState}
60 """
61
62 logCategory = 'comp-avatar'
63
64 - def __init__(self, heaven, avatarId, remoteIdentity, mind, conf,
65 jobState, clocking):
80
81
83 mood = '(unknown)'
84 if self.componentState:
85 moodValue = self.componentState.get('mood')
86 if moodValue is not None:
87 mood = moods.get(moodValue).name
88 return '<%s %s (mood %s)>' % (self.__class__.__name__,
89 self.avatarId, mood)
90
91
94 def gotStates(result):
95 (_s1, conf), (_s2, jobState), (_s3, clocking) = result
96 assert _s1 and _s2 and _s3
97 log.debug('component-avatar', 'got state information')
98 return (heaven, avatarId, remoteIdentity, mind,
99 conf, jobState, clocking)
100 log.debug('component-avatar', 'calling mind for state information')
101 d = defer.DeferredList([mind.callRemote('getConfig'),
102 mind.callRemote('getState'),
103 mind.callRemote('getMasterClockInfo')],
104 fireOnOneErrback=True)
105 d.addCallback(gotStates)
106 return d
107 makeAvatarInitArgs = classmethod(makeAvatarInitArgs)
108
131
132
133 - def addMessage(self, level, id, format, *args, **kwargs):
134 """
135 Convenience message to construct a message and add it to the
136 component state. `format' should be marked as translatable in
137 the source with N_, and *args will be stored as format
138 arguments. Keyword arguments are passed on to the message
139 constructor. See L{flumotion.common.messages.Message} for the
140 meanings of the rest of the arguments.
141
142 For example::
143
144 self.addMessage(messages.WARNING, 'foo-warning',
145 N_('The answer is %d'), 42, debug='not really')
146 """
147 self.addMessageObject(messages.Message(level,
148 T_(format, *args),
149 id=id, **kwargs))
150
152 """
153 Add a message to the planet state.
154
155 @type message: L{flumotion.common.messages.Message}
156 """
157 self.componentState.append('messages', message)
158
175
177
178
179
180
181
182
183
184
185 def verifyExistingComponentState(conf, state):
186
187 state.setJobState(self.jobState)
188 self.componentState = state
189
190 self.upgradeConfig(state, conf)
191 if state.get('config') != conf:
192 diff = config.dictDiff(state.get('config'), conf)
193 diffMsg = config.dictDiffMessageString(diff,
194 'internal conf',
195 'running conf')
196 self.addMessage(messages.WARNING, 'stale-config',
197 N_("Component logged in with stale "
198 "configuration. Consider stopping "
199 "this component and restarting "
200 "the manager."),
201 debug=("Updating internal conf from "
202 "running conf:\n" + diffMsg))
203 self.warning('updating internal component state for %r',
204 state)
205 self.debug('changes to conf: %s',
206 config.dictDiffMessageString(diff))
207 state.set('config', conf)
208
209 def makeNewComponentState(conf):
210
211 state = planet.ManagerComponentState()
212 state.setJobState(self.jobState)
213 self.componentState = state
214
215 self.upgradeConfig(state, conf)
216
217 flowName, compName = conf['parent'], conf['name']
218
219 state.set('name', compName)
220 state.set('type', conf['type'])
221 state.set('workerRequested', self.jobState.get('workerName'))
222 state.set('config', conf)
223 self.vishnu.addComponentToFlow(state, flowName)
224 return state
225
226 mState = self.vishnu.getManagerComponentState(self.avatarId)
227 if mState:
228 verifyExistingComponentState(conf, mState)
229 else:
230 makeNewComponentState(conf)
231
233 """
234 Tell the component to provide a master clock.
235
236 @rtype: L{twisted.internet.defer.Deferred}
237 """
238 def success(clocking):
239 self.clocking = clocking
240 self.heaven.masterClockAvailable(self.avatarId, clocking)
241
242 def error(failure):
243 self.addMessage(messages.WARNING, 'provide-master-clock',
244 N_('Failed to provide the master clock'),
245 debug=log.getFailureMessage(failure))
246 self.vishnu.releasePortsOnWorker(self.getWorkerName(), [port])
247
248 if self.clocking:
249 self.heaven.masterClockAvailable(self.avatarId, self.clocking)
250 else:
251 (port,) = self.vishnu.reservePortsOnWorker(self.getWorkerName(), 1)
252 self.debug('provideMasterClock on port %d', port)
253
254 d = self.mindCallRemote('provideMasterClock', port)
255 d.addCallbacks(success, error)
256
258 """
259 Returns the port on which a feed server for this component is
260 listening on.
261
262 @rtype: int
263 """
264 return self.vishnu.getWorkerFeedServerPort(self.getWorkerName())
265
267 """
268 Get the IP address of the manager as seen by the component.
269
270 @rtype: str
271 """
272 return self.jobState.get('manager-ip')
273
275 """
276 Return the name of the worker.
277
278 @rtype: str
279 """
280 return self.jobState.get('workerName')
281
283 """
284 Return the PID of the component.
285
286 @rtype: int
287 """
288 return self.jobState.get('pid')
289
291 """
292 Get the name of the component.
293
294 @rtype: str
295 """
296 return self.componentState.get('name')
297
299 """
300 Get the name of the component's parent.
301
302 @rtype: str
303 """
304 return self.componentState.get('parent').get('name')
305
307 """
308 Get the component type name of the component.
309
310 @rtype: str
311 """
312 return self.componentState.get('type')
313
315 """
316 Get the set of eaters that this component eats from.
317
318 @rtype: dict of eaterName -> [(feedId, eaterAlias)]
319 """
320 return self.componentState.get('config').get('eater', {})
321
323 """
324 Get the list of feeders that this component provides.
325
326 @rtype: list of feederName
327 """
328 return self.componentState.get('config').get('feed', [])
329
331 """
332 Get the feedId of a feed provided or consumed by this component.
333
334 @param feedName: The name of the feed (i.e., eater alias or
335 feeder name)
336 @rtype: L{flumotion.common.common.feedId}
337 """
338 return common.feedId(self.getName(), feedName)
339
341 """
342 Get the full feedId of a feed provided or consumed by this
343 component.
344
345 @param feedName: The name of the feed (i.e., eater alias or
346 feeder name)
347 @rtype: L{flumotion.common.common.fullFeedId}
348 """
349 return common.fullFeedId(self.getParentName(), self.getName(), feedName)
350
352 """
353 Get the set of virtual feeds provided by this component.
354
355 @rtype: dict of fullFeedId -> (ComponentAvatar, feederName)
356 """
357 conf = self.componentState.get('config')
358 ret = {}
359 for feedId, feederName in conf.get('virtual-feeds', {}).items():
360 vComp, vFeed = common.parseFeedId(feedId)
361 ffid = common.fullFeedId(self.getParentName(), vComp, vFeed)
362 ret[ffid] = (self, feederName)
363 return ret
364
366 """
367 Get the worker that this component should run on.
368
369 @rtype: str
370 """
371 return self.componentState.get('workerRequested')
372
374 """
375 Get this component's clock master, if any.
376
377 @rtype: avatarId or None
378 """
379 return self.componentState.get('config')['clock-master']
380
382 """
383 Tell the remote component to shut down.
384 """
385 return self.mindCallRemote('stop')
386
390
391 - def eatFrom(self, eaterAlias, fullFeedId, host, port):
395
396 - def feedTo(self, feederName, fullFeedId, host, port):
400
401
403 """
404 Authenticate the given keycard.
405 Gets proxied to L{flumotion.component.bouncers.bouncer.""" \
406 """BouncerMedium.remote_authenticate}
407 The component should be a subclass of
408 L{flumotion.component.bouncers.bouncer.Bouncer}
409
410 @type keycard: L{flumotion.common.keycards.Keycard}
411 """
412 return self.mindCallRemote('authenticate', keycard)
413
415 """
416 Remove a keycard managed by this bouncer because the requester
417 has gone.
418
419 @type keycardId: str
420 """
421 return self.mindCallRemote('removeKeycardId', keycardId)
422
424 """
425 Expire a keycard issued to this component because the bouncer decided
426 to.
427
428 @type keycardId: str
429 """
430 return self.mindCallRemote('expireKeycard', keycardId)
431
432
434 """
435 Called by a component to tell the manager that it's shutting down
436 cleanly (and thus should go to sleeping, rather than lost or sad)
437 """
438 self.debug("shutdown is clean, shouldn't go to lost")
439 self._shutdown_requested = True
440
457
459 """
460 Expire a keycard (and thus the requester's connection)
461 issued to the given requester.
462
463 This is called by the bouncer component that authenticated the keycard.
464
465 @param requesterId: name (avatarId) of the component that originally
466 requested authentication for the given keycardId
467 @type requesterId: str
468 @param keycardId: id of keycard to expire
469 @type keycardId: str
470 """
471
472 if not self.heaven.hasAvatar(requesterId):
473 self.warning('asked to expire keycard %s for requester %s, '
474 'but no such component registered',
475 keycardId, requesterId)
476 raise errors.UnknownComponentError(requesterId)
477
478 return self.heaven.getAvatar(requesterId).expireKeycard(keycardId)
479
481 - def add(self, key, value):
485
486 - def remove(self, key, value):
490
491 -class FeedMap(object, log.Loggable):
492 logName = 'feed-map'
494 self.avatars = {}
495 self._ordered_avatars = []
496 self._dirty = True
497 self._recalc()
498
500 assert avatar.avatarId not in self.avatars
501 self.avatars[avatar.avatarId] = avatar
502 self._ordered_avatars.append(avatar)
503 self._dirty = True
504
506
507
508 del self.avatars[avatar.avatarId]
509 self._ordered_avatars.remove(avatar)
510 self._dirty = True
511
512
513 return [a for a in self.feedDeps.pop(avatar, [])
514 if a.avatarId in self.avatars]
515
528
530 if not self._dirty:
531 return
532 self.feedersForEaters = ffe = {}
533 self.eatersForFeeders = eff = dictlist()
534 self.feeds = dictlist()
535 self.feedDeps = dictlist()
536
537 for comp in self._ordered_avatars:
538 for feederName in comp.getFeeders():
539 self.feeds.add(comp.getFullFeedId(feederName),
540 (comp, feederName))
541 for ffid, pair in comp.getVirtualFeeds().items():
542 self.feeds.add(ffid, pair)
543
544 for eater in self.avatars.values():
545 for pairs in eater.getEaters().values():
546 for feedId, eName in pairs:
547 feeder, fName = self.getFeederAvatar(eater, feedId)
548 if feeder:
549 ffe[eater.getFullFeedId(eName)] = (eName, feeder, fName)
550 eff.add(feeder.getFullFeedId(fName),
551 (fName, eater, eName))
552 else:
553 self.debug('eater %s waiting for feed %s to log in',
554 eater.getFeedId(eName), feedId)
555 self._dirty = False
556
558 """Get the set of feeds that this component is eating from,
559 keyed by eater alias.
560
561 @return: a list of (eaterAlias, feederAvatar, feedName) tuples
562 @rtype: list of (str, ComponentAvatar, str)
563 """
564 self._recalc()
565 ret = []
566 for tups in avatar.getEaters().values():
567 for feedId, alias in tups:
568 ffid = avatar.getFullFeedId(alias)
569 if ffid in self.feedersForEaters:
570 ret.append(self.feedersForEaters[ffid])
571 return ret
572
574 """Get the set of eaters that this component feeds, keyed by
575 feeder name.
576
577 @return: a list of (feederName, eaterAvatar, eaterAlias) tuples
578 @rtype: list of (str, ComponentAvatar, str)
579 """
580 self._recalc()
581 ret = []
582 for feedName in avatar.getFeeders():
583 ffid = avatar.getFullFeedId(feedName)
584 if ffid in self.eatersForFeeders:
585 ret.extend(self.eatersForFeeders[ffid])
586 return ret
587
589 """
590 I handle all registered components and provide L{ComponentAvatar}s
591 for them.
592 """
593
594 implements(interfaces.IHeaven)
595 avatarClass = ComponentAvatar
596
597 logCategory = 'comp-heaven'
598
603
604
613
621
623 master = avatar.getClockMaster()
624 if master:
625 if master == avatar.avatarId:
626 self.debug('Need for %r to provide a clock master',
627 master)
628 avatar.provideMasterClock()
629 else:
630 self.debug('Need to synchronize with clock master %r',
631 master)
632
633
634
635
636 m = self.vishnu.getComponentMapper(master)
637 if m and m.avatar:
638 clocking = m.avatar.clocking
639 if clocking:
640 host, port, base_time = clocking
641 avatar.setClocking(host, port, base_time)
642 else:
643 self.warning('%r should provide a clock master '
644 'but is not doing so', master)
645
646 else:
647 self.debug('clock master not logged in yet, will '
648 'set clocking later')
649
656
663
665 toHost = toAvatar.getClientAddress()
666 toPort = toAvatar.getFeedServerPort()
667
668
669
670
671
672
673
674 fromHost = fromAvatar.mind.broker.transport.getPeer().host
675 if fromHost == toHost:
676 toHost = '127.0.0.1'
677
678 return toHost, toPort
679
681 def connect(fromComp, fromFeed, toComp, toFeed, method):
682 host, port = self.mapNetFeed(fromComp, toComp)
683 if port:
684 fullFeedId = toComp.getFullFeedId(toFeed)
685 proc = getattr(fromComp, method)
686 proc(fromFeed, fullFeedId, host, port)
687 else:
688 self.debug('postponing connection to %s: feed server '
689 'unavailable', toComp.getFeedId(toFeed))
690
691
692 def always(otherComp):
693 return True
694 def never(otherComp):
695 return False
696 directions = [(self.feedMap.getFeedersForEaters,
697 always, 'eatFrom', 'feedTo'),
698 (self.feedMap.getEatersForFeeders,
699 never, 'feedTo', 'eatFrom')]
700
701 myComp = avatar
702 for getPeers, initiate, directMethod, reversedMethod in directions:
703 for myFeedName, otherComp, otherFeedName in getPeers(myComp):
704 if initiate(otherComp):
705
706 connect(myComp, myFeedName, otherComp, otherFeedName,
707 directMethod)
708 else:
709
710 connect(otherComp, otherFeedName, myComp, myFeedName,
711 reversedMethod)
712