1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 Contains the base class for PB client-side mediums.
24 """
25
26 import time
27
28 from twisted.spread import pb
29 from twisted.internet import defer, reactor
30 from zope.interface import implements
31
32 from flumotion.common import log, interfaces, bundleclient, errors, common
33 from flumotion.common import messages
34 from flumotion.configure import configure
35 from flumotion.twisted import pb as fpb
36
38 """
39 I am a base interface for PB clients interfacing with PB server-side
40 avatars.
41 Used by admin/worker/component to talk to manager's vishnu,
42 and by job to talk to worker's brain.
43
44 @ivar remote: a remote reference to the server-side object on
45 which perspective_(methodName) methods can be called
46 @type remote: L{twisted.spread.pb.RemoteReference}
47 @type bundleLoader: L{flumotion.common.bundleclient.BundleLoader}
48 """
49
50
51
52 implements(interfaces.IMedium)
53 logCategory = "basemedium"
54 remoteLogName = "baseavatar"
55
56 remote = None
57 bundleLoader = None
58
60 """
61 Set the given remoteReference as the reference to the server-side
62 avatar.
63
64 @param remoteReference: L{twisted.spread.pb.RemoteReference}
65 """
66 self.debug('%r.setRemoteReference: %r' % (self, remoteReference))
67 self.remote = remoteReference
68 def nullRemote(x):
69 self.debug('%r: disconnected from %r' % (self, self.remote))
70 self.remote = None
71 self.remote.notifyOnDisconnect(nullRemote)
72
73 self.bundleLoader = bundleclient.BundleLoader(self.callRemote)
74
75
76 tarzan = None
77 jane = None
78 try:
79 transport = remoteReference.broker.transport
80 tarzan = transport.getHost()
81 jane = transport.getPeer()
82 except Exception, e:
83 self.debug("could not get connection info, reason %r" % e)
84 if tarzan and jane:
85 self.debug("connection is from me on %s to remote on %s" % (
86 common.addressGetHost(tarzan),
87 common.addressGetHost(jane)))
88
90 """
91 Does the medium have a remote reference to a server-side avatar ?
92 """
93 return self.remote != None
94
96 """
97 Call the given method with the given arguments remotely on the
98 server-side avatar.
99
100 Gets serialized to server-side perspective_ methods.
101
102 @param level: the level we should log at (log.DEBUG, log.INFO, etc)
103 @type level: int
104 @param stackDepth: the number of stack frames to go back to get
105 file and line information, negative or zero.
106 @type stackDepth: non-positive int
107 @param name: name of the remote method
108 @type name: str
109 """
110 if level is not None:
111 debugClass = str(self.__class__).split(".")[-1].upper()
112 startArgs = [self.remoteLogName, debugClass, name]
113 format, debugArgs = log.getFormatArgs(
114 '%s --> %s: callRemote(%s, ', startArgs,
115 ')', (), args, kwargs)
116 logKwArgs = self.doLog(level, stackDepth - 1,
117 format, *debugArgs)
118
119 if not self.remote:
120 self.warning('Tried to callRemote(%s), but we are disconnected'
121 % name)
122 return defer.fail(errors.NotConnectedError())
123
124 def callback(result):
125 format, debugArgs = log.getFormatArgs(
126 '%s <-- %s: callRemote(%s, ', startArgs,
127 '): %s', (log.ellipsize(result), ), args, kwargs)
128 self.doLog(level, -1, format, *debugArgs, **logKwArgs)
129 return result
130
131 def errback(failure):
132 format, debugArgs = log.getFormatArgs(
133 '%s <-- %s: callRemote(%s, ', startArgs,
134 '): %r', (failure, ), args, kwargs)
135 self.doLog(level, -1, format, *debugArgs, **logKwArgs)
136 return failure
137
138 d = self.remote.callRemote(name, *args, **kwargs)
139 if level is not None:
140 d.addCallbacks(callback, errback)
141 return d
142
144 """
145 Call the given method with the given arguments remotely on the
146 server-side avatar.
147
148 Gets serialized to server-side perspective_ methods.
149 """
150 return self.callRemoteLogging(log.DEBUG, -1, name, *args,
151 **kwargs)
152
154 """
155 Returns the given function in the given module, loading the
156 module from a bundle.
157
158 If we can't find the bundle for the given module, or if the
159 given module does not contain the requested function, we will
160 raise L{flumotion.common.errors.RemoteRunError} (perhaps a
161 poorly chosen error). If importing the module raises an
162 exception, that exception will be passed through unmodified.
163
164 @param module: module the function lives in
165 @type module: str
166 @param function: function to run
167 @type function: str
168
169 @returns: a callable, the given function in the given module.
170 """
171 def gotModule(mod):
172 if hasattr(mod, function):
173 return getattr(mod, function)
174 else:
175 msg = 'No procedure named %s in module %s' % (function,
176 module)
177 self.warning('%s', msg)
178 raise errors.RemoteRunError(msg)
179
180 def gotModuleError(failure):
181 failure.trap(errors.NoBundleError)
182 msg = 'Failed to find bundle for module %s' % module
183 self.warning('%s', msg)
184 raise errors.RemoteRunError(msg)
185
186 d = self.bundleLoader.loadModule(module)
187 d.addCallbacks(gotModule, gotModuleError)
188 return d
189
191 """
192 Runs the given function in the given module with the given
193 arguments.
194
195 This method calls getBundledFunction and then invokes the
196 function. Any error raised by getBundledFunction or by invoking
197 the function will be passed through unmodified.
198
199 Callers that expect to return their result over a PB connection
200 should catch nonserializable exceptions so as to prevent nasty
201 backtraces in the logs.
202
203 @param module: module the function lives in
204 @type module: str
205 @param function: function to run
206 @type function: str
207
208 @returns: the return value of the given function in the module.
209 """
210 self.debug('runBundledFunction(%r, %r)', module, function)
211 def gotFunction(proc):
212 def invocationError(failure):
213 self.warning('Exception raised while calling '
214 '%s.%s(*args=%r, **kwargs=%r): %s',
215 module, function, args, kwargs,
216 log.getFailureMessage(failure))
217 return failure
218
219 self.debug('calling %s.%s(%r, %r)', module, function, args,
220 kwargs)
221 d = defer.maybeDeferred(proc, *args, **kwargs)
222 d.addErrback(invocationError)
223 return d
224
225 d = self.getBundledFunction(module, function)
226 d.addCallback(gotFunction)
227 return d
228
262
281
283 if self.remote:
284 self.remote.broker.transport.loseConnection()
285
291 self.remote.notifyOnDisconnect(stopPingingCb)
292
293 self.startPinging(self._disconnect)
294