Package flumotion :: Package twisted :: Module defer
[hide private]

Source Code for Module flumotion.twisted.defer

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_defer -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import random 
 23   
 24  from twisted.internet import defer, reactor 
 25  from twisted.python import reflect 
 26   
 27  # FIXME: this is for HandledException - maybe it should move here instead ? 
 28  from flumotion.common import errors 
 29   
 30  # See flumotion.test.test_defer for examples 
31 -def defer_generator(proc):
32 def wrapper(*args, **kwargs): 33 gen = proc(*args, **kwargs) 34 result = defer.Deferred() 35 36 # To support having the errback of last resort, we need to have 37 # an errback which runs after all the other errbacks, *at the 38 # point at which the deferred is fired*. So users of this code 39 # have from between the time the deferred is created and the 40 # time that the deferred is fired to attach their errbacks. 41 # 42 # Unfortunately we only control the time that the deferred is 43 # created. So we attach a first errback that then adds an 44 # errback to the end of the list. Unfortunately we can't add to 45 # the list while the deferred is firing. In a decision between 46 # having decent error reporting and being nice to a small part 47 # of twisted I chose the former. This code takes a reference to 48 # the callback list, so that we can add an errback to the list 49 # while the deferred is being fired. It temporarily sets the 50 # state of the deferred to not having been fired, so that adding 51 # the errbacks doesn't automatically call the newly added 52 # methods. 53 result.__callbacks = result.callbacks 54 def with_saved_callbacks(proc, *_args, **_kwargs): 55 saved_callbacks, saved_called = result.callbacks, result.called 56 result.callbacks, result.called = result.__callbacks, False 57 proc(*_args, **_kwargs) 58 result.callbacks, result.called = saved_callbacks, saved_called
59 60 # Add errback-of-last-resort 61 def default_errback(failure, d): 62 # an already handled exception just gets propagated up without 63 # doing a traceback 64 if failure.check(errors.HandledException): 65 return failure 66 67 def print_traceback(f): 68 import traceback 69 print 'flumotion.twisted.defer: ' + \ 70 'Unhandled error calling', proc.__name__, ':', f.type 71 traceback.print_exc() 72 with_saved_callbacks (lambda: d.addErrback(print_traceback)) 73 raise 74 result.addErrback(default_errback, result) 75 76 def generator_next(): 77 try: 78 x = gen.next() 79 if isinstance(x, defer.Deferred): 80 x.addCallback(callback, x).addErrback(errback, x) 81 else: 82 result.callback(x) 83 except StopIteration: 84 result.callback(None) 85 except Exception, e: 86 result.errback(e) 87 88 def errback(failure, d): 89 def raise_error(): 90 # failure.parents[-1] will be the exception class for local 91 # failures and the string name of the exception class 92 # for remote failures (which might not exist in our 93 # namespace) 94 # 95 # failure.value will be the tuple of arguments to the 96 # exception in the local case, or a string 97 # representation of that in the remote case (see 98 # pb.CopyableFailure.getStateToCopy()). 99 # 100 # we can only reproduce a remote exception if the 101 # exception class is in our namespace, and it only takes 102 # one string argument. if either condition is not true, 103 # we wrap the strings in a default Exception. 104 k, v = failure.parents[-1], failure.value 105 try: 106 if isinstance(k, str): 107 k = reflect.namedClass(k) 108 if isinstance(v, tuple): 109 e = k(*v) 110 else: 111 e = k(v) 112 except Exception: 113 e = Exception('%s: %r' % (failure.type, v)) 114 raise e 115 d.value = raise_error 116 generator_next() 117 118 def callback(result, d): 119 d.value = lambda: result 120 generator_next() 121 122 generator_next() 123 124 return result 125 126 return wrapper 127
128 -def defer_generator_method(proc):
129 return lambda self, *args, **kwargs: \ 130 defer_generator(proc)(self, *args, **kwargs)
131
132 -def defer_call_later(deferred):
133 """ 134 Return a deferred which will fire from a callLater after d fires 135 """ 136 def fire(result, d): 137 reactor.callLater(0, d.callback, result)
138 res = defer.Deferred() 139 deferred.addCallback(fire, res) 140 return res 141
142 -class Resolution:
143 """ 144 I am a helper class to make sure that the deferred is fired only once 145 with either a result or exception. 146 147 @ivar d: the deferred that gets fired as part of the resolution 148 @type d: L{twisted.internet.defer.Deferred} 149 """
150 - def __init__(self):
151 self.d = defer.Deferred() 152 self.fired = False
153
154 - def cleanup(self):
155 """ 156 Clean up any resources related to the resolution. 157 Subclasses can implement me. 158 """ 159 pass
160
161 - def callback(self, result):
162 """ 163 Make the result succeed, triggering the callbacks with the given result. 164 If a result was already reached, do nothing. 165 """ 166 if not self.fired: 167 self.fired = True 168 self.cleanup() 169 self.d.callback(result)
170
171 - def errback(self, exception):
172 """ 173 Make the result fail, triggering the errbacks with the given exception. 174 If a result was already reached, do nothing. 175 """ 176 if not self.fired: 177 self.fired = True 178 self.cleanup() 179 self.d.errback(exception)
180
181 -class RetryingDeferred(object):
182 """ 183 Provides a mechanism to attempt to run some deferred operation until it 184 succeeds. On failure, the operation is tried again later, exponentially 185 backing off. 186 """ 187 maxDelay = 1800 # Default to 30 minutes 188 initialDelay = 5.0 189 # Arbitrarily take these constants from twisted's ReconnectingClientFactory 190 factor = 2.7182818284590451 191 jitter = 0.11962656492 192 delay = None 193
194 - def __init__(self, deferredCreate, *args, **kwargs):
195 """ 196 Create a new RetryingDeferred. Will call 197 deferredCreate(*args, **kwargs) each time a new deferred is needed. 198 """ 199 self._create = deferredCreate 200 self._args = args 201 self._kwargs = kwargs 202 203 self._masterD = None 204 self._running = False 205 self._callId = None
206
207 - def start(self):
208 """ 209 Start trying. Returns a deferred that will fire when this operation 210 eventually succeeds. That deferred will only errback if this 211 RetryingDeferred is cancelled (it will then errback with the result of 212 the next attempt if one is in progress, or a CancelledError. # TODO: yeah? 213 """ 214 self._masterD = defer.Deferred() 215 self._running = True 216 217 self._retry() 218 219 return self._masterD
220
221 - def cancel(self):
222 if self._callId: 223 self._callId.cancel() 224 self._masterD.errback(errors.CancelledError()) 225 self._masterD = None 226 227 self._callId = None 228 self._running = False
229
230 - def _retry(self):
231 self._callId = None 232 d = self._create(*self._args, **self._kwargs) 233 d.addCallbacks(self._success, self._failed)
234
235 - def _success(self, val):
236 # TODO: what if we were cancelled and then get here? 237 self._masterD.callback(val) 238 self._masterD = None
239
240 - def _failed(self, failure):
241 if self._running: 242 next = self._nextDelay() 243 self._callId = reactor.callLater(next, self._retry) 244 else: 245 self._masterD.errback(failure) 246 self._masterD = None
247
248 - def _nextDelay(self):
249 if self.delay is None: 250 self.delay = self.initialDelay 251 else: 252 self.delay = self.delay * self.factor 253 254 if self.jitter: 255 self.delay = random.normalvariate(self.delay, 256 self.delay * self.jitter) 257 self.delay = min(self.delay, self.maxDelay) 258 259 return self.delay
260