1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 manager-side objects to handle worker clients
24 """
25
26 from twisted.internet import defer
27
28 from flumotion.manager import base
29 from flumotion.common import errors, interfaces, log, registry
30 from flumotion.common import config, worker, common
31
33 """
34 I am an avatar created for a worker.
35 A reference to me is given when logging in and requesting a worker avatar.
36 I live in the manager.
37
38 @ivar feedServerPort: TCP port the feed server is listening on
39 @type feedServerPort: int
40 """
41 logCategory = 'worker-avatar'
42
43 _portSet = None
44 feedServerPort = None
45
46 - def __init__(self, heaven, avatarId, remoteIdentity, mind,
47 feedServerPort, ports, randomPorts):
56
59
62 def havePorts(res):
63 log.debug('worker-avatar', 'got port information')
64 (_s1, feedServerPort), (_s2, (ports, random)) = res
65 return (heaven, avatarId, remoteIdentity, mind,
66 feedServerPort, ports, random)
67 log.debug('worker-avatar', 'calling mind for port information')
68 d = defer.DeferredList([mind.callRemote('getFeedServerPort'),
69 mind.callRemote('getPorts')],
70 fireOnOneErrback=True)
71 d.addCallback(havePorts)
72 return d
73 makeAvatarInitArgs = classmethod(makeAvatarInitArgs)
74
79
81 """
82 Reserve the given number of ports on the worker.
83
84 @param numPorts: how many ports to reserve
85 @type numPorts: int
86 """
87 return self._portSet.reservePorts(numPorts)
88
90 """
91 Release the given list of ports on the worker.
92
93 @param ports: list of ports to release
94 @type ports: list of int
95 """
96 self._portSet.releasePorts(ports)
97
99 """
100 Create a component of the given type with the given nice level.
101
102 @param avatarId: avatarId the component should use to log in
103 @type avatarId: str
104 @param type: type of the component to create
105 @type type: str
106 @param nice: the nice level to create the component at
107 @type nice: int
108 @param conf: the component's config dict
109 @type conf: dict
110
111 @returns: a deferred that will give the avatarId the component
112 will use to log in to the manager
113 """
114 self.debug('creating %s (%s) on worker %s with nice level %d',
115 avatarId, type, self.avatarId, nice)
116 defs = registry.getRegistry().getComponent(type)
117 try:
118 entry = defs.getEntryByType('component')
119
120 moduleName = defs.getSource()
121 methodName = entry.getFunction()
122 except KeyError:
123 self.warning('no "component" entry in registry of type %s, %s',
124 type, 'falling back to createComponent')
125 moduleName = defs.getSource()
126 methodName = "createComponent"
127
128 self.debug('call remote create')
129 return self.mindCallRemote('create', avatarId, type, moduleName,
130 methodName, nice, conf)
131
133 """
134 Get a list of components that the worker is running.
135
136 @returns: a deferred that will give the avatarIds running on the
137 worker
138 """
139 self.debug('getting component list from worker %s' %
140 self.avatarId)
141 return self.mindCallRemote('getComponents')
142
143
145 """
146 Called by the worker to tell the manager to add a given message to
147 the given component.
148
149 Useful in cases where the component can't report messages itself,
150 for example because it crashed.
151
152 @param avatarId: avatarId of the component the message is about
153 @type message: L{flumotion.common.messages.Message}
154 """
155 self.debug('received message from component %s' % avatarId)
156 self.vishnu.componentAddMessage(avatarId, message)
157
159 """
160 I interface between the Manager and worker clients.
161 For each worker client I create an L{WorkerAvatar} to handle requests.
162 I live in the manager.
163 """
164
165 logCategory = "workerheaven"
166 avatarClass = WorkerAvatar
167
171
172
190
192 """
193 Notify the heaven that the given worker has logged out.
194
195 @type workerAvatar: L{WorkerAvatar}
196 """
197 workerName = workerAvatar.getName()
198 try:
199 self.state.remove('names', workerName)
200 for state in list(self.state.get('workers')):
201 if state.get('name') == workerName:
202 self.state.remove('workers', state)
203 except ValueError:
204 self.warning('worker %s was never registered in the heaven',
205 workerName)
206