1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import random
23
24 from twisted.internet import defer, reactor
25 from twisted.python import reflect
26
27
28 from flumotion.common import errors
29
30
32 def wrapper(*args, **kwargs):
33 gen = proc(*args, **kwargs)
34 result = defer.Deferred()
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 result.__callbacks = result.callbacks
54 def with_saved_callbacks(proc, *_args, **_kwargs):
55 saved_callbacks, saved_called = result.callbacks, result.called
56 result.callbacks, result.called = result.__callbacks, False
57 proc(*_args, **_kwargs)
58 result.callbacks, result.called = saved_callbacks, saved_called
59
60
61 def default_errback(failure, d):
62
63
64 if failure.check(errors.HandledException):
65 return failure
66
67 def print_traceback(f):
68 import traceback
69 print 'flumotion.twisted.defer: ' + \
70 'Unhandled error calling', proc.__name__, ':', f.type
71 traceback.print_exc()
72 with_saved_callbacks (lambda: d.addErrback(print_traceback))
73 raise
74 result.addErrback(default_errback, result)
75
76 def generator_next():
77 try:
78 x = gen.next()
79 if isinstance(x, defer.Deferred):
80 x.addCallback(callback, x).addErrback(errback, x)
81 else:
82 result.callback(x)
83 except StopIteration:
84 result.callback(None)
85 except Exception, e:
86 result.errback(e)
87
88 def errback(failure, d):
89 def raise_error():
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104 k, v = failure.parents[-1], failure.value
105 try:
106 if isinstance(k, str):
107 k = reflect.namedClass(k)
108 if isinstance(v, tuple):
109 e = k(*v)
110 else:
111 e = k(v)
112 except Exception:
113 e = Exception('%s: %r' % (failure.type, v))
114 raise e
115 d.value = raise_error
116 generator_next()
117
118 def callback(result, d):
119 d.value = lambda: result
120 generator_next()
121
122 generator_next()
123
124 return result
125
126 return wrapper
127
129 return lambda self, *args, **kwargs: \
130 defer_generator(proc)(self, *args, **kwargs)
131
133 """
134 Return a deferred which will fire from a callLater after d fires
135 """
136 def fire(result, d):
137 reactor.callLater(0, d.callback, result)
138 res = defer.Deferred()
139 deferred.addCallback(fire, res)
140 return res
141
143 """
144 I am a helper class to make sure that the deferred is fired only once
145 with either a result or exception.
146
147 @ivar d: the deferred that gets fired as part of the resolution
148 @type d: L{twisted.internet.defer.Deferred}
149 """
151 self.d = defer.Deferred()
152 self.fired = False
153
155 """
156 Clean up any resources related to the resolution.
157 Subclasses can implement me.
158 """
159 pass
160
162 """
163 Make the result succeed, triggering the callbacks with the given result.
164 If a result was already reached, do nothing.
165 """
166 if not self.fired:
167 self.fired = True
168 self.cleanup()
169 self.d.callback(result)
170
172 """
173 Make the result fail, triggering the errbacks with the given exception.
174 If a result was already reached, do nothing.
175 """
176 if not self.fired:
177 self.fired = True
178 self.cleanup()
179 self.d.errback(exception)
180
182 """
183 Provides a mechanism to attempt to run some deferred operation until it
184 succeeds. On failure, the operation is tried again later, exponentially
185 backing off.
186 """
187 maxDelay = 1800
188 initialDelay = 5.0
189
190 factor = 2.7182818284590451
191 jitter = 0.11962656492
192 delay = None
193
194 - def __init__(self, deferredCreate, *args, **kwargs):
195 """
196 Create a new RetryingDeferred. Will call
197 deferredCreate(*args, **kwargs) each time a new deferred is needed.
198 """
199 self._create = deferredCreate
200 self._args = args
201 self._kwargs = kwargs
202
203 self._masterD = None
204 self._running = False
205 self._callId = None
206
208 """
209 Start trying. Returns a deferred that will fire when this operation
210 eventually succeeds. That deferred will only errback if this
211 RetryingDeferred is cancelled (it will then errback with the result of
212 the next attempt if one is in progress, or a CancelledError. # TODO: yeah?
213 """
214 self._masterD = defer.Deferred()
215 self._running = True
216
217 self._retry()
218
219 return self._masterD
220
222 if self._callId:
223 self._callId.cancel()
224 self._masterD.errback(errors.CancelledError())
225 self._masterD = None
226
227 self._callId = None
228 self._running = False
229
231 self._callId = None
232 d = self._create(*self._args, **self._kwargs)
233 d.addCallbacks(self._success, self._failed)
234
236
237 self._masterD.callback(val)
238 self._masterD = None
239
241 if self._running:
242 next = self._nextDelay()
243 self._callId = reactor.callLater(next, self._retry)
244 else:
245 self._masterD.errback(failure)
246 self._masterD = None
247
260