1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 worker-side objects to handle worker clients
24 """
25
26 import signal
27
28 from twisted.internet import reactor, error
29 from twisted.spread import flavors
30 from zope.interface import implements
31
32 from flumotion.common import errors, interfaces, log
33 from flumotion.common import medium
34 from flumotion.twisted import pb as fpb
35
36 JOB_SHUTDOWN_TIMEOUT = 5
37
38 factoryClass = fpb.ReconnectingFPBClientFactory
40 """
41 I am a client factory for the worker to log in to the manager.
42 """
43 logCategory = 'worker'
44 perspectiveInterface = interfaces.IWorkerMedium
45
47 """
48 @type medium: L{flumotion.worker.medium.WorkerMedium}
49 @type host: str
50 @type port: int
51 """
52 self._managerHost = host
53 self._managerPort = port
54 self.medium = medium
55
56 factoryClass.__init__(self)
57
58 self.maxDelay = 10
59
69
70
72
73
74 def remoteDisconnected(remoteReference):
75 if reactor.killed:
76 self.log('Connection to manager lost due to shutdown')
77 else:
78 self.warning('Lost connection to manager, '
79 'will attempt to reconnect')
80
81 def loginCallback(reference):
82 self.info("Logged in to manager")
83 self.debug("remote reference %r" % reference)
84
85 self.medium.setRemoteReference(reference)
86 reference.notifyOnDisconnect(remoteDisconnected)
87
88 def alreadyConnectedErrback(failure):
89 failure.trap(errors.AlreadyConnectedError)
90 self.warning('A worker with the name "%s" is already connected.' %
91 failure.value)
92
93 def accessDeniedErrback(failure):
94 failure.trap(errors.NotAuthenticatedError)
95 self.warning('Access denied.')
96
97 def connectionRefusedErrback(failure):
98 failure.trap(error.ConnectionRefusedError)
99 self.warning('Connection to %s:%d refused.' % (self._managerHost,
100 self._managerPort))
101
102 def NoSuchMethodErrback(failure):
103 failure.trap(flavors.NoSuchMethod)
104
105 if failure.value.find('remote_getKeycardClasses') > -1:
106 self.warning(
107 "Manager %s:%d is older than version 0.3.0. "
108 "Please upgrade." % (self._managerHost, self._managerPort))
109 return
110
111 return failure
112
113 def loginFailedErrback(failure):
114 self.warning('Login failed, reason: %s' % str(failure))
115
116 d.addCallback(loginCallback)
117 d.addErrback(accessDeniedErrback)
118 d.addErrback(connectionRefusedErrback)
119 d.addErrback(alreadyConnectedErrback)
120 d.addErrback(NoSuchMethodErrback)
121 d.addErrback(loginFailedErrback)
122
124 """
125 I am a medium interfacing with the manager-side WorkerAvatar.
126
127 @ivar brain: the worker brain
128 @type brain: L{WorkerBrain}
129 """
130
131 logCategory = 'workermedium'
132
133 implements(interfaces.IWorkerMedium)
134
136 """
137 @type brain: L{WorkerBrain}
138 """
139 self.brain = brain
140 self.factory = None
141
156
161
162
164 """
165 Gets the set of TCP ports that this worker is configured to use.
166
167 @rtype: 2-tuple: (list of int, bool)
168 @return: list of ports, and a boolean if we allocate ports
169 randomly
170 """
171 return self.brain.getPorts()
172
174 """
175 Return the TCP port the Feed Server is listening on.
176
177 @rtype: int, or NoneType
178 @return: TCP port number, or None if there is no feed server
179 """
180 return self.brain.getFeedServerPort()
181
182 - def remote_create(self, avatarId, type, moduleName, methodName,
183 nice, conf):
184 """
185 Start a component of the given type with the given nice level.
186 Will spawn a new job process to run the component in.
187
188 @param avatarId: avatar identification string
189 @type avatarId: str
190 @param type: type of the component to create
191 @type type: str
192 @param moduleName: name of the module to create the component from
193 @type moduleName: str
194 @param methodName: the factory method to use to create the component
195 @type methodName: str
196 @param nice: nice level
197 @type nice: int
198 @param conf: component config
199 @type conf: dict
200
201 @returns: a deferred fired when the process has started and created
202 the component
203 """
204 return self.brain.create(avatarId, type, moduleName, methodName,
205 nice, conf)
206
208 """
209 Checks if one or more GStreamer elements are present and can be
210 instantiated.
211
212 @param elementNames: names of the Gstreamer elements
213 @type elementNames: list of str
214
215 @rtype: list of str
216 @returns: a list of instantiatable element names
217 """
218 return self.brain.runCheck('flumotion.worker.checks.check',
219 'checkElements', elementNames)
220
222 """
223 Checks if the given module can be imported.
224
225 @param moduleName: name of the module to check
226 @type moduleName: str
227
228 @returns: None or Failure
229 """
230 return self.brain.runCheck('flumotion.worker.checks.check', 'checkImport',
231 moduleName)
232
234 """
235 Runs the given function in the given module with the given arguments.
236
237 @param module: module the function lives in
238 @type module: str
239 @param function: function to run
240 @type function: str
241
242 @returns: the return value of the given function in the module.
243 """
244 return self.brain.runCheck(module, function, *args, **kwargs)
245 remote_runFunction = remote_runCheck
246
248 """
249 I return a list of componentAvatarIds, I have. I am called by the
250 manager soon after I attach to it. This is needed on reconnects
251 so that the manager knows what components it needs to start on me.
252
253 @returns: a list of componentAvatarIds
254 """
255 return self.brain.getComponents()
256
258 """Kill one of the worker's jobs.
259
260 This method is intended for exceptional purposes only; a normal
261 component shutdown is performed by the manager via calling
262 remote_stop() on the component avatar.
263
264 Raises L{flumotion.common.errors.UnknownComponentError} if the
265 job is unknown.
266
267 @param avatarId: the avatar Id of the component, e.g.
268 '/default/audio-encoder'
269 @type avatarId: string
270 @param signum: Signal to send, optional. Defaults to SIGKILL.
271 @type signum: int
272 """
273 self.brain.killJob(avatarId, signum)
274