1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 manager implementation and related classes
24
25 API Stability: semi-stable
26
27 @var LOCAL_IDENTITY: an identity for the manager itself; can be used
28 to compare against to verify that the manager
29 requested an action
30 @type LOCAL_IDENTITY: L{LocalIdentity}
31 """
32
33 __all__ = ['ManagerServerFactory', 'Vishnu']
34
35 import os
36
37 from twisted.internet import reactor, defer
38 from twisted.python import components, failure
39 from twisted.spread import pb
40 from twisted.cred import portal
41 from zope.interface import implements
42
43 from flumotion.common import bundle, config, errors, interfaces, log, registry
44 from flumotion.common import planet, common, dag, messages, reflectcall, server
45 from flumotion.common.identity import RemoteIdentity, LocalIdentity
46 from flumotion.common.planet import moods
47 from flumotion.configure import configure
48 from flumotion.manager import admin, component, worker, base
49 from flumotion.twisted import checkers
50 from flumotion.twisted import portal as fportal
51
52 from flumotion.common.messages import N_
53 T_ = messages.gettexter('flumotion')
54
55 LOCAL_IDENTITY = LocalIdentity('manager')
56
57 -def _find(list, value, proc=lambda x: x):
59
60 -def _first(list, proc=lambda x: x):
61 for x in list:
62 if proc(x): return x
63
64 -def _any(list, proc=lambda x: x):
65 return filter(proc, list)
66
68
69 def int(*args, **kwargs):
70 for p in procs:
71 if not p(*args, **kwargs): return False
72 return True
73 return int
74
75
76
78 """
79 I implement L{twisted.cred.portal.IRealm}.
80 I make sure that when a L{pb.Avatar} is requested through me, the
81 Avatar being returned knows about the mind (client) requesting
82 the Avatar.
83 """
84
85 implements(portal.IRealm)
86
87 logCategory = 'dispatcher'
88
90 """
91 @param computeIdentity: see L{Vishnu.computeIdentity}
92 @type computeIdentity: callable
93 """
94 self._interfaceHeavens = {}
95 self._computeIdentity = computeIdentity
96 self._bouncer = None
97 self._avatarKeycards = {}
98
100 """
101 @param bouncer: the bouncer to authenticate with
102 @type bouncer: L{flumotion.component.bouncers.bouncer}
103 """
104 self._bouncer = bouncer
105
107 """
108 Register a Heaven as managing components with the given interface.
109
110 @type interface: L{twisted.python.components.Interface}
111 @param interface: a component interface to register the heaven with.
112 """
113 assert isinstance(heaven, base.ManagerHeaven)
114
115 self._interfaceHeavens[interface] = heaven
116
117
141
142 return (pb.IPerspective, avatar, cleanup)
143
144 def got_error(failure):
145
146
147
148
149
150 reactor.callLater(0, mind.broker.transport.loseConnection)
151 return failure
152
153 if pb.IPerspective not in ifaces:
154 raise errors.NoPerspectiveError(avatarId)
155 if len(ifaces) != 2:
156
157 raise errors.NoPerspectiveError(avatarId)
158 iface = [x for x in ifaces if x != pb.IPerspective][0]
159 if iface not in self._interfaceHeavens:
160 self.warning('unknown interface %r', iface)
161 raise errors.NoPerspectiveError(avatarId)
162
163 heaven = self._interfaceHeavens[iface]
164 klass = heaven.avatarClass
165 host = common.addressGetHost(mind.broker.transport.getPeer())
166 d = self._computeIdentity(keycard, host)
167 d.addCallback(lambda identity: \
168 klass.makeAvatar(heaven, avatarId, identity, mind))
169 d.addCallbacks(got_avatar, got_error)
170 return d
171
173 """
174 I am an object that ties together different objects related to a
175 component. I am used as values in a lookup hash in the vishnu.
176 """
178 self.state = None
179 self.id = None
180 self.avatar = None
181 self.jobState = None
182
184 """
185 I am the toplevel manager object that knows about all heavens and factories.
186
187 @cvar dispatcher: dispatcher to create avatars
188 @type dispatcher: L{Dispatcher}
189 @cvar workerHeaven: the worker heaven
190 @type workerHeaven: L{worker.WorkerHeaven}
191 @cvar componentHeaven: the component heaven
192 @type componentHeaven: L{component.ComponentHeaven}
193 @cvar adminHeaven: the admin heaven
194 @type adminHeaven: L{admin.AdminHeaven}
195 """
196
197 implements(server.IServable)
198
199 logCategory = "vishnu"
200
201 - def __init__(self, name, unsafeTracebacks=0, configDir=None):
216 reactor.addSystemEventTrigger('before', 'shutdown', setStopped)
217
218 if configDir is not None:
219 self.configDir = configDir
220 else:
221 self.configDir = os.path.join(configure.configdir,
222 "managers", name)
223
224 self.bouncer = None
225
226 self.bundlerBasket = registry.getRegistry().makeBundlerBasket()
227
228 self._componentMappers = {}
229
230 self.state = planet.ManagerPlanetState()
231 self.state.set('name', name)
232
233 self.plugs = {}
234
235
236
237 self.portal = fportal.BouncerPortal(self.dispatcher, None)
238
239 self.factory = pb.PBServerFactory(self.portal,
240 unsafeTracebacks=unsafeTracebacks)
241 self.connectionInfo = {}
242 self.setConnectionInfo(None, None, None)
243
245 """Cancel any pending operations in preparation for shutdown.
246
247 This method is mostly useful for unit tests; currently, it is
248 not called during normal operation. Note that the caller is
249 responsible for stopping listening on the port, as the the
250 manager does not have a handle on the twisted port object.
251
252 @returns: A deferred that will fire when the manager has shut
253 down.
254 """
255 if self.bouncer:
256 return self.bouncer.stop()
257 else:
258 return defer.succeed(None)
259
263
265 """Returns the manager's configuration as a string suitable for
266 importing via loadConfiguration().
267 """
268 return config.exportPlanetXml(self.state)
269
284
285 - def addMessage(self, level, id, format, *args, **kwargs):
286 """
287 Convenience message to construct a message and add it to the
288 planet state. `format' should be marked as translatable in the
289 source with N_, and *args will be stored as format arguments.
290 Keyword arguments are passed on to the message constructor. See
291 L{flumotion.common.messages.Message} for the meanings of the
292 rest of the arguments.
293
294 For example::
295
296 self.addMessage(messages.WARNING, 'foo-warning',
297 N_('The answer is %d'), 42, debug='not really')
298 """
299 self.addMessageObject(messages.Message(level,
300 T_(format, *args),
301 id=id, **kwargs))
302
304 """
305 Add a message to the planet state.
306
307 @type message: L{flumotion.common.messages.Message}
308 """
309 self.state.setitem('messages', message.id, message)
310
312 """
313 Clear any messages with the given message ID from the planet
314 state.
315
316 @type mid: message ID, normally a str
317 """
318 if mid in self.state.get('messages'):
319 self.state.delitem('messages', mid)
320
322 """
323 @param identity: L{flumotion.common.identity.Identity}
324 """
325 socket = 'flumotion.component.plugs.adminaction.AdminAction'
326 if self.plugs.has_key(socket):
327 for plug in self.plugs[socket]:
328 plug.action(identity, message, args, kw)
329
331 """
332 Compute a suitable identity for a remote host. First looks to
333 see if there is a
334 flumotion.component.plugs.identity.IdentityProvider plug
335 installed on the manager, falling back to user@host.
336
337 The identity is only used in the adminaction interface. An
338 example of its use is when you have an adminaction plug that
339 checks an admin's privileges before actually doing an action;
340 the identity object you use here might store the privileges that
341 the admin has.
342
343 @param keycard: the keycard that the remote host used to log in.
344 @type keycard: L{flumotion.common.keycards.Keycard}
345 @param remoteHost: the ip of the remote host
346 @type remoteHost: str
347
348 @rtype: a deferred that will fire a
349 L{flumotion.common.identity.RemoteIdentity}
350 """
351
352 socket = 'flumotion.component.plugs.identity.IdentityProvider'
353 if self.plugs.has_key(socket):
354 for plug in self.plugs[socket]:
355 identity = plug.computeIdentity(keycard, remoteHost)
356 if identity:
357 return identity
358 username = getattr(keycard, 'username', None)
359 return defer.succeed(RemoteIdentity(username, remoteHost))
360
362 """
363 Add a component state for the given component config entry.
364
365 @rtype: L{flumotion.common.planet.ManagerComponentState}
366 """
367
368 self.debug('adding component %s to %s'
369 % (conf.name, parent.get('name')))
370
371 if identity != LOCAL_IDENTITY:
372 self.adminAction(identity, '_addComponent', (conf, parent), {})
373
374 state = planet.ManagerComponentState()
375 state.set('name', conf.name)
376 state.set('type', conf.getType())
377 state.set('workerRequested', conf.worker)
378 state.setMood(moods.sleeping.value)
379 state.set('config', conf.getConfigDict())
380
381 state.set('parent', parent)
382 parent.append('components', state)
383
384 avatarId = conf.getConfigDict()['avatarId']
385
386 self.clearMessage('loadComponent-%s' % avatarId)
387
388
389
390 if not common.checkVersionsCompat(conf.getConfigDict()['version'],
391 configure.versionTuple):
392 m = messages.Warning(T_(N_("This component is configured for "
393 "Flumotion version %s, but you are running version %s.\n"
394 "Please update the configuration of the component.\n"),
395 common.versionTupleToString(conf.getConfigDict()['version']),
396 configure.version))
397 state.append('messages', m)
398
399
400 m = ComponentMapper()
401 m.state = state
402 m.id = avatarId
403 self._componentMappers[state] = m
404 self._componentMappers[avatarId] = m
405
406 return state
407
409 """
410 Add a new config object into the planet state.
411
412 @returns: a list of all components added
413 @rtype: list of L{flumotion.common.planet.ManagerComponentState}
414 """
415
416 self.debug('syncing up planet state with config')
417 added = []
418
419 def checkNotRunning(comp, parentState):
420 name = comp.getName()
421
422 comps = dict([(x.get('name'), x)
423 for x in parentState.get('components')])
424 if name not in comps:
425 return True
426
427
428
429
430 parent = comps[name].get('parent').get('name')
431 newConf = c.getConfigDict()
432 oldConf = comps[name].get('config')
433
434 if newConf == oldConf:
435 self.debug('%s already has component %s running with '
436 'same configuration', parent, name)
437 self.clearMessage('loadComponent-%s' % oldConf['avatarId'])
438 return False
439
440 self.info('%s already has component %s, but configuration '
441 'not the same -- notifying admin', parent, name)
442
443 diff = config.dictDiff(oldConf, newConf)
444 diffMsg = config.dictDiffMessageString(diff, 'existing', 'new')
445
446 self.addMessage(messages.WARNING,
447 'loadComponent-%s' % oldConf['avatarId'],
448 N_('Could not load component %r into %r: '
449 'a component is already running with '
450 'this name, but has a different '
451 'configuration.'), name, parent,
452 debug=diffMsg)
453 return False
454
455 state = self.state
456 atmosphere = state.get('atmosphere')
457 for c in conf.atmosphere.components.values():
458 if checkNotRunning(c, atmosphere):
459 added.append(self._addComponent(c, atmosphere, identity))
460
461 flows = dict([(x.get('name'), x) for x in state.get('flows')])
462 for f in conf.flows:
463 if f.name in flows:
464 flow = flows[f.name]
465 else:
466 self.info('creating flow %r', f.name)
467 flow = planet.ManagerFlowState(name=f.name, parent=state)
468 state.append('flows', flow)
469
470 for c in f.components.values():
471 if checkNotRunning(c, flow):
472 added.append(self._addComponent(c, flow, identity))
473
474 return added
475
477
478
479 componentsToStart = {}
480 for c in components:
481 workerId = c.get('workerRequested')
482 if not workerId in componentsToStart:
483 componentsToStart[workerId] = []
484 componentsToStart[workerId].append(c)
485 self.debug('_startComponents: componentsToStart %r' % componentsToStart)
486
487 for workerId, componentStates in componentsToStart.items():
488 self._workerCreateComponents(workerId, componentStates)
489
496
498 """
499 Load the configuration from the given XML, merging it on top of
500 the currently running configuration.
501
502 @param file: file to parse, either as an open file object,
503 or as the name of a file to open
504 @type file: str or file
505 @param identity: The identity making this request.. This is used by the
506 adminaction logging mechanism in order to say who is
507 performing the action.
508 @type identity: L{flumotion.common.identity.Identity}
509 """
510 self.debug('loading configuration')
511 mid = 'loadComponent-parse-error'
512 if isinstance(file, str):
513 mid += '-%s' % file
514 try:
515 self.clearMessage(mid)
516 conf = config.FlumotionConfigXML(file)
517 conf.parse()
518 return self._loadComponentConfiguration(conf, identity)
519 except errors.ConfigError, e:
520 self.addMessage(messages.WARNING, mid,
521 N_('Invalid component configuration.'),
522 debug=e.args[0])
523 return defer.fail(e)
524 except errors.UnknownComponentError, e:
525 if isinstance(file, str):
526 debug = 'Configuration loaded from file %r' % file
527 else:
528 debug = 'Configuration loaded remotely'
529 self.addMessage(messages.WARNING, mid,
530 N_('Unknown component in configuration: %s.'),
531 e.args[0], debug=debug)
532 return defer.fail(e)
533 except Exception, e:
534 self.addMessage(messages.WARNING, mid,
535 N_('Unknown error while loading configuration.'),
536 debug=log.getExceptionMessage(e))
537 return defer.fail(e)
538
555
561
582 def setupErrback(failure):
583 self.warning('Error starting manager bouncer')
584 d.addCallbacks(setupCallback, setupErrback)
585 return d
586
603
604 __pychecker__ = 'maxargs=11'
605 - def loadComponent(self, identity, componentType, componentId,
606 componentLabel, properties, workerName,
607 plugs, eaters, isClockMaster, virtualFeeds):
608 """
609 Load a component into the manager configuration.
610
611 See L{flumotion.manager.admin.AdminAvatar.perspective_loadComponent}
612 for a definition of the argument types.
613 """
614 self.debug('loading %s component %s on %s',
615 componentType, componentId, workerName)
616 parentName, compName = common.parseComponentId(componentId)
617
618 if isClockMaster:
619 raise NotImplementedError("Clock master components are not "
620 "yet supported")
621 if worker is None:
622 raise errors.ConfigError("Component %r needs to specify the"
623 " worker on which it should run"
624 % componentId)
625
626 state = self.state
627 compState = None
628
629 compConf = config.ConfigEntryComponent(compName, parentName,
630 componentType,
631 componentLabel,
632 properties,
633 plugs, workerName,
634 eaters, isClockMaster,
635 None, None, virtualFeeds)
636
637 if compConf.defs.getNeedsSynchronization():
638 raise NotImplementedError("Components that need "
639 "synchronization are not yet "
640 "supported")
641
642 if parentName == 'atmosphere':
643 parentState = state.get('atmosphere')
644 else:
645 flows = dict([(x.get('name'), x) for x in state.get('flows')])
646 if parentName in flows:
647 parentState = flows[parentName]
648 else:
649 self.info('creating flow %r', parentName)
650 parentState = planet.ManagerFlowState(name=parentName,
651 parent=state)
652 state.append('flows', parentState)
653
654 components = [x.get('name') for x in parentState.get('components')]
655 if compName in components:
656 self.debug('%r already has component %r', parentName, compName)
657 raise errors.ComponentAlreadyExistsError(compName)
658
659 compState = self._addComponent(compConf, parentState, identity)
660
661 self._startComponents([compState], identity)
662
663 return compState
664
666 """
667 Create a heaven of the given klass that will send avatars to clients
668 implementing the given medium interface.
669
670 @param interface: the medium interface to create a heaven for
671 @type interface: L{flumotion.common.interfaces.IMedium}
672 @param klass: the type of heaven to create
673 @type klass: an implementor of L{flumotion.common.interfaces.IHeaven}
674 """
675 assert issubclass(interface, interfaces.IMedium)
676 heaven = klass(self)
677 self.dispatcher.registerHeaven(heaven, interface)
678 return heaven
679
690
693
695 """
696 Create the given component. This will currently also trigger
697 a start eventually when the component avatar attaches.
698
699 The component should be sleeping.
700 The worker it should be started on should be present.
701 """
702 m = componentState.get('mood')
703 if m != moods.sleeping.value:
704 raise errors.ComponentMoodError("%r not sleeping but %s" % (
705 componentState, moods.get(m).name))
706
707 p = componentState.get('moodPending')
708 if p != None:
709 raise errors.ComponentMoodError(
710 "%r already has a pending mood %s" % (
711 componentState, moods.get(p).name))
712
713
714 workerId = (componentState.get('workerName')
715 or componentState.get('workerRequested'))
716
717 if not workerId in self.workerHeaven.avatars:
718 raise errors.ComponentNoWorkerError(
719 "worker %s is not logged in" % workerId)
720 else:
721 return self._workerCreateComponents(workerId, [componentState])
722
724
725
726
727 def stopSad():
728
729 self.debug('asked to stop a sad component without avatar')
730 componentState.setMood(moods.sleeping.value)
731 componentState.set('moodPending', None)
732 return defer.succeed(None)
733
734 def stopLost():
735 def gotComponents(comps):
736 return avatarId in comps
737 def gotError(failure):
738 sdfa
739 def gotJobRunning(running):
740 if running:
741 self.warning('asked to stop lost component %r, but '
742 'it is still running', avatarId)
743
744
745 else:
746 self.debug('component %r seems to be really lost, '
747 'setting to sleeping')
748 componentState.setMood(moods.sleeping.value)
749 componentState.set('moodPending', None)
750 self.debug('asked to stop a lost component without avatar')
751 workerName = componentState.get('workerRequested')
752 if workerName and self.workerHeaven.hasAvatar(workerName):
753 self.debug('checking if component has job process running')
754 d = self.workerHeaven.getAvatar(workerName).getComponents()
755 d.addCallbacks(gotComponents, gotError)
756 d.addCallback(gotJobRunning)
757 return d
758 else:
759 self.debug('component lacks a worker, setting to sleeping')
760 d = defer.maybeDeferred(gotJobRunning, False)
761 return d
762
763 def stopUnknown():
764 msg = ('asked to stop a component without avatar in mood %s'
765 % moods.get(mood))
766 self.warning(msg)
767 return defer.fail(errors.ComponentMoodError(msg))
768
769 mood = componentState.get('mood')
770 stoppers = {moods.sad.value: stopSad,
771 moods.lost.value: stopLost}
772 return stoppers.get(mood, stopUnknown)()
773
775 def cleanupAndDisconnectComponent(result):
776 return componentAvatar.disconnect()
777
778 def setSleeping(result):
779 if componentState.get('mood') == moods.sad.value:
780 self.debug('clearing sad mood after having stopped component')
781 componentState.setMood(moods.sleeping.value)
782
783 return result
784
785 d = componentAvatar.stop()
786 d.addCallback(cleanupAndDisconnectComponent)
787 d.addCallback(setSleeping)
788
789 return d
790
792 """
793 Stop the given component.
794 If the component was sad, we clear its sad state as well,
795 since the stop was explicitly requested by the admin.
796
797 @type componentState: L{planet.ManagerComponentState}
798
799 @rtype: L{twisted.internet.defer.Deferred}
800 """
801 self.debug('componentStop(%r)', componentState)
802
803
804 if (componentState.get('moodPending') != None and
805 componentState.get('moodPending') != moods.happy.value):
806 self.debug("Pending mood is %r", componentState.get('moodPending'))
807
808 raise errors.BusyComponentError(componentState)
809
810 m = self.getComponentMapper(componentState)
811 if not m:
812
813
814 self.warning("Component mapper for component state %r doesn't "
815 "exist", componentState)
816 raise errors.UnknownComponentError(componentState)
817 elif not m.avatar:
818 return self._componentStopNoAvatar(componentState, m.id)
819 else:
820 return self._componentStopWithAvatar(componentState, m.avatar)
821
823 """
824 Set the given message on the given component's state.
825 Can be called e.g. by a worker to report on a crashed component.
826 Sets the mood to sad if it is an error message.
827 """
828 if not avatarId in self._componentMappers:
829 self.warning('asked to set a message on non-mapped component %s' %
830 avatarId)
831 return
832
833 m = self._componentMappers[avatarId]
834 m.state.append('messages', message)
835 if message.level == messages.ERROR:
836 self.debug('Error message makes component sad')
837 m.state.setMood(moods.sad.value)
838
839
841
842 workerId = workerAvatar.avatarId
843 self.debug('vishnu.workerAttached(): id %s' % workerId)
844
845
846
847
848 components = [c for c in self._getComponentsToCreate()
849 if c.get('workerRequested') in (workerId, None)]
850
851
852
853
854 d = workerAvatar.getComponents()
855 def workerAvatarComponentListReceived(workerComponents):
856
857 lostComponents = list([c for c in self.getComponentStates()
858 if c.get('workerRequested') == workerId and \
859 c.get('mood') == moods.lost.value])
860 for comp in workerComponents:
861
862
863 if comp in self._componentMappers:
864 compState = self._componentMappers[comp].state
865 if compState in components:
866 components.remove(compState)
867 if compState in lostComponents:
868 lostComponents.remove(compState)
869
870 for compState in lostComponents:
871 self.info(
872 "Restarting previously lost component %s on worker %s",
873 self._componentMappers[compState].id, workerId)
874
875
876
877 compState.set('moodPending', None)
878 compState.setMood(moods.sleeping.value)
879
880 allComponents = components + lostComponents
881
882 if not allComponents:
883 self.debug(
884 "vishnu.workerAttached(): no components for this worker")
885 return
886
887 self._workerCreateComponents(workerId, allComponents)
888 d.addCallback(workerAvatarComponentListReceived)
889
890 reactor.callLater(0, self.componentHeaven.feedServerAvailable,
891 workerId)
892
894 """
895 Create the list of components on the given worker, sequentially, but
896 in no specific order.
897
898 @param workerId: avatarId of the worker
899 @type workerId: string
900 @param components: components to start
901 @type components: list of
902 L{flumotion.common.planet.ManagerComponentState}
903 """
904 self.debug("_workerCreateComponents: workerId %r, components %r" % (
905 workerId, components))
906
907 if not workerId in self.workerHeaven.avatars:
908 self.debug('worker %s not logged in yet, delaying '
909 'component start' % workerId)
910 return defer.succeed(None)
911
912 workerAvatar = self.workerHeaven.avatars[workerId]
913
914 d = defer.Deferred()
915
916 for c in components:
917 componentType = c.get('type')
918 conf = c.get('config')
919 self.debug('scheduling create of %s on %s'
920 % (conf['avatarId'], workerId))
921 d.addCallback(self._workerCreateComponentDelayed,
922 workerAvatar, c, componentType, conf)
923
924 d.addCallback(lambda result: self.debug(
925 '_workerCreateComponents(): completed setting up create chain'))
926
927
928 self.debug('_workerCreateComponents(): triggering create chain')
929 d.callback(None)
930
931 return d
932
949
950
951
952
954 self.debug('got avatarId %s for state %s' % (result, componentState))
955 m = self._componentMappers[componentState]
956 assert result == m.id, "received id %s is not the expected id %s" % (
957 result, m.id)
958
982
984
985 workerId = workerAvatar.avatarId
986 self.debug('vishnu.workerDetached(): id %s' % workerId)
987
989
990 if flowName == 'atmosphere':
991
992 flow = self.state.get('atmosphere')
993 else:
994 flow = _first(self.state.get('flows'),
995 lambda x: x.get('name') == flowName)
996 if not flow:
997 self.info('Creating flow "%s"' % flowName)
998 flow = planet.ManagerFlowState()
999 flow.set('name', flowName)
1000 flow.set('parent', self.state)
1001 self.state.append('flows', flow)
1002
1003 componentState.set('parent', flow)
1004 flow.append('components', componentState)
1005
1007
1008 m = (self.getComponentMapper(componentAvatar.avatarId)
1009 or ComponentMapper())
1010
1011 m.state = componentAvatar.componentState
1012 m.jobState = componentAvatar.jobState
1013 m.id = componentAvatar.avatarId
1014 m.avatar = componentAvatar
1015
1016 self._componentMappers[m.state] = m
1017 self._componentMappers[m.jobState] = m
1018 self._componentMappers[m.id] = m
1019 self._componentMappers[m.avatar] = m
1020
1022
1023
1024 self.debug('unregisterComponent(%r): cleaning up state' %
1025 componentAvatar)
1026
1027 if componentAvatar not in self._componentMappers:
1028 self.warning("Component logging out that was incompletely logged "
1029 " in, ignoring")
1030 return
1031
1032 m = self._componentMappers[componentAvatar]
1033
1034
1035 try:
1036 del self._componentMappers[m.jobState]
1037 except KeyError:
1038 self.warning('Could not remove jobState for %r' % componentAvatar)
1039 m.jobState = None
1040
1041 m.state.set('pid', None)
1042 m.state.set('workerName', None)
1043 m.state.set('moodPending', None)
1044
1045
1046 del self._componentMappers[m.avatar]
1047 m.avatar = None
1048
1050 cList = self.state.getComponents()
1051 self.debug('getComponentStates(): %d components' % len(cList))
1052 for c in cList:
1053 self.log(repr(c))
1054 mood = c.get('mood')
1055 if mood == None:
1056 self.warning('%s has mood None' % c.get('name'))
1057
1058 return cList
1059
1061 """
1062 Empty the planet of the given component.
1063
1064 @returns: a deferred that will fire when all listeners have been
1065 notified of the removal of the component.
1066 """
1067 self.debug('deleting component %r from state', componentState)
1068 c = componentState
1069 if c not in self._componentMappers:
1070 raise errors.UnknownComponentError(c)
1071
1072 flow = componentState.get('parent')
1073 if (c.get('moodPending') != None
1074 or c.get('mood') is not moods.sleeping.value):
1075 raise errors.BusyComponentError(c)
1076
1077 del self._componentMappers[self._componentMappers[c].id]
1078 del self._componentMappers[c]
1079 return flow.remove('components', c)
1080
1082 """
1083 Empty the planet of a flow.
1084
1085 @returns: a deferred that will fire when the flow is removed.
1086 """
1087
1088
1089 flow = _find(self.state.get('flows'), flowName, lambda x: x.get('name'))
1090 components = flow.get('components')
1091
1092
1093 isBusy = lambda c: c.get('moodPending') != None
1094 isNotSleeping = lambda c: c.get('mood') is not moods.sleeping.value
1095 pred = _fint(isBusy, isNotSleeping)
1096 if _any(components, pred):
1097 raise errors.BusyComponentError(_first(components, pred))
1098
1099 for c in components:
1100 del self._componentMappers[self._componentMappers[c].id]
1101 del self._componentMappers[c]
1102 d = flow.empty()
1103 d.addCallback(lambda _: self.state.remove('flows', flow))
1104 return d
1105
1107 """
1108 Empty the planet of all components, and flows. Also clears all
1109 messages.
1110
1111 @returns: a deferred that will fire when the planet is empty.
1112 """
1113 for mid in self.state.get('messages').keys():
1114 self.clearMessage(mid)
1115
1116
1117 components = self.getComponentStates()
1118
1119
1120 isPending = lambda c: c.get('moodPending') != None
1121 components = filter(isPending, components)
1122 if len(components) > 0:
1123 state = components[0]
1124 raise errors.BusyComponentError(state,
1125 "moodPending is %s" % moods.get(state.get('moodPending')))
1126
1127
1128 components = self.getComponentStates()
1129 isNotSleeping = lambda c: c.get('mood') is not moods.sleeping.value
1130 components = filter(isNotSleeping, components)
1131
1132
1133 d = defer.Deferred()
1134
1135 self.debug('need to stop %d components: %r' % (
1136 len(components), components))
1137
1138
1139
1140 for c in components:
1141 avatar = self._componentMappers[c].avatar
1142
1143
1144 if avatar:
1145 d.addCallback(lambda result, a: a.stop(), avatar)
1146 else:
1147 assert (c.get('mood') is moods.sad.value or
1148 c.get('mood') is moods.lost.value)
1149
1150 d.addCallback(self._emptyPlanetCallback)
1151
1152
1153 reactor.callLater(0, d.callback, None)
1154
1155 return d
1156
1158
1159
1160 components = self.getComponentStates()
1161 self.debug('_emptyPlanetCallback: need to delete %d components' %
1162 len(components))
1163
1164 for c in components:
1165 if c.get('mood') is not moods.sleeping.value:
1166 self.warning('Component %s is not sleeping', c.get('name'))
1167
1168 m = self._componentMappers[c]
1169 del self._componentMappers[m.id]
1170 del self._componentMappers[c]
1171
1172
1173 l = self._componentMappers.keys()
1174 if len(l) > 0:
1175 self.warning('mappers still has keys %r' % (repr(l)))
1176
1177 dList = []
1178
1179 dList.append(self.state.get('atmosphere').empty())
1180
1181 for f in self.state.get('flows'):
1182 self.debug('appending deferred for emptying flow %r' % f)
1183 dList.append(f.empty())
1184 self.debug('appending deferred for removing flow %r' % f)
1185 dList.append(self.state.remove('flows', f))
1186 self.debug('appended deferreds')
1187
1188 dl = defer.DeferredList(dList)
1189 return dl
1190
1192 """
1193 @rtype: list of L{flumotion.common.planet.ManagerComponentState}
1194 """
1195
1196 components = self.state.getComponents()
1197
1198
1199
1200
1201
1202 isSleeping = lambda c: c.get('mood') == moods.sleeping.value
1203 components = filter(isSleeping, components)
1204 return components
1205
1207
1208 if not workerName in self.workerHeaven.avatars:
1209 raise errors.ComponentNoWorkerError("Worker %s not logged in?"
1210 % workerName)
1211
1212 return self.workerHeaven.avatars[workerName]
1213
1215 if workerName in self.workerHeaven.avatars:
1216 return self._getWorker(workerName).feedServerPort
1217 return None
1218
1220 """
1221 Requests a number of ports on the worker named workerName. The
1222 ports will be reserved for the use of the caller until
1223 releasePortsOnWorker is called.
1224
1225 @returns: a list of ports as integers
1226 """
1227 return self._getWorker(workerName).reservePorts(numPorts)
1228
1230 """
1231 Tells the manager that the given ports are no longer being used,
1232 and may be returned to the allocation pool.
1233 """
1234 try:
1235 return self._getWorker(workerName).releasePorts(ports)
1236 except errors.ComponentNoWorkerError, e:
1237 self.warning('could not release ports: %r' % e.args)
1238
1240 """
1241 Look up an object mapper given the object.
1242
1243 @rtype: L{ComponentMapper} or None
1244 """
1245 if object in self._componentMappers.keys():
1246 return self._componentMappers[object]
1247
1248 return None
1249
1251 """
1252 Look up an object mapper given the object.
1253
1254 @rtype: L{ComponentMapper} or None
1255 """
1256 if object in self._componentMappers.keys():
1257 return self._componentMappers[object].state
1258
1259 return None
1260