1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 implementation of a PB Client to interface with feedserver.py
24 """
25
26 import socket
27 import os
28
29 from twisted.internet import reactor, main, defer, tcp
30 from twisted.python import failure
31 from zope.interface import implements
32
33 from flumotion.common import log, common, interfaces
34 from flumotion.twisted import pb as fpb
35
36
37
53
56
65
77
78
80 """
81 I am a client for a Feed Server.
82
83 I am used as the remote interface between a component and another
84 component.
85
86 @ivar component: the component this is a feed client for
87 @type component: L{flumotion.component.feedcomponent.FeedComponent}
88 @ivar remote: a reference to a L{FeedAvatar}
89 @type remote: L{twisted.spread.pb.RemoteReference}
90 """
91 logCategory = 'feedmedium'
92 remoteLogName = 'feedserver'
93 implements(interfaces.IFeedMedium)
94
95 remote = None
96
103
104 - def startConnecting(self, host, port, authenticator, timeout=30,
105 bindAddress=None):
106 """Optional helper method to connect to a remote feed server.
107
108 This method starts a client factory connecting via a
109 L{PassableClientConnector}. It offers the possibility of
110 cancelling an in-progress connection via the stopConnecting()
111 method.
112
113 @param host: the remote host name
114 @type host: str
115 @param port: the tcp port on which to connect
116 @param port int
117 @param authenticator: the authenticator, normally provided by
118 the worker
119 @param authenticator: L{flumotion.twisted.pb.Authenticator}
120
121 @returns: a deferred that will fire with the remote reference,
122 once we have authenticated.
123 """
124 assert self._factory is None
125 self._factory = FeedClientFactory(self)
126 reactor.connectWith(PassableClientConnector, host, port,
127 self._factory, timeout, bindAddress)
128 return self._factory.login(authenticator)
129
130 - def requestFeed(self, host, port, authenticator, fullFeedId):
131 """Request a feed from a remote feed server.
132
133 This helper method calls startConnecting() to make the
134 connection and authenticate, and will return the feed file
135 descriptor or an error. A pending connection attempt can be
136 cancelled via stopConnecting().
137
138 @param host: the remote host name
139 @type host: str
140 @param port: the tcp port on which to connect
141 @type port int
142 @param authenticator: the authenticator, normally provided by
143 the worker
144 @type authenticator: L{flumotion.twisted.pb.Authenticator}
145 @param fullFeedId: the full feed id (/flow/component:feed)
146 offered by the remote side
147 @type fullFeedId: str
148
149 @returns: a deferred that, if successful, will fire with a pair
150 (feedId, fd). In an error case it will errback and close the
151 remote connection.
152 """
153 def connected(remote):
154 self.setRemoteReference(remote)
155 return remote.callRemote('sendFeed', fullFeedId)
156
157 def feedSent(res):
158
159
160
161
162 return self._feedToDeferred
163
164 def error(failure):
165 self.warning('failed to retrieve %s from %s:%d', fullFeedId,
166 host, port)
167 self.debug('failure: %s', log.getFailureMessage(failure))
168 self.debug('closing connection')
169 self.stopConnecting()
170 return failure
171
172 d = self.startConnecting(host, port, authenticator)
173 d.addCallback(connected)
174 d.addCallback(feedSent)
175 d.addErrback(error)
176 return d
177
178 - def sendFeed(self, host, port, authenticator, fullFeedId):
179 """Send a feed to a remote feed server.
180
181 This helper method calls startConnecting() to make the
182 connection and authenticate, and will return the feed file
183 descriptor or an error. A pending connection attempt can be
184 cancelled via stopConnecting().
185
186 @param host: the remote host name
187 @type host: str
188 @param port: the tcp port on which to connect
189 @type port int
190 @param authenticator: the authenticator, normally provided by
191 the worker
192 @type authenticator: L{flumotion.twisted.pb.Authenticator}
193 @param fullFeedId: the full feed id (/flow/component:eaterAlias)
194 to feed to on the remote size
195 @type fullFeedId: str
196
197 @returns: a deferred that, if successful, will fire with a pair
198 (feedId, fd). In an error case it will errback and close the
199 remote connection.
200 """
201 def connected(remote):
202 assert isinstance(remote.broker.transport, _SocketMaybeCloser)
203 self.setRemoteReference(remote)
204 return remote.callRemote('receiveFeed', fullFeedId)
205
206 def feedSent(res):
207 t = self.remote.broker.transport
208 self.debug('stop reading from transport')
209 t.stopReading()
210
211 self.debug('flushing PB write queue')
212 t.doWrite()
213 self.debug('stop writing to transport')
214 t.stopWriting()
215
216 t.keepSocketAlive = True
217 fd = os.dup(t.fileno())
218
219
220 self.setRemoteReference(None)
221
222 d = defer.Deferred()
223 def loseConnection():
224 t.connectionLost(failure.Failure(main.CONNECTION_DONE))
225 d.callback((fullFeedId, fd))
226
227 reactor.callLater(0, loseConnection)
228 return d
229
230 def error(failure):
231 self.warning('failed to retrieve %s from %s:%d', fullFeedId,
232 host, port)
233 self.debug('failure: %s', log.getFailureMessage(failure))
234 self.debug('closing connection')
235 self.stopConnecting()
236 return failure
237
238 d = self.startConnecting(host, port, authenticator)
239 d.addCallback(connected)
240 d.addCallback(feedSent)
241 d.addErrback(error)
242 return d
243
245 """Stop a pending or established connection made via
246 startConnecting().
247
248 Stops any established or pending connection to a remote feed
249 server started via the startConnecting() method. Safe to call
250 even if connection has not been started.
251 """
252 if self._factory:
253 self._factory.disconnect()
254 self._factory = None
255
256
257 self.setRemoteReference(None)
258
259
261 self.remote = remoteReference
262
264 return self.remote is not None
265
268
275
301