1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """base classes for PB client-side mediums.
23 """
24
25 import time
26
27 from twisted.spread import pb
28 from twisted.internet import defer
29 from zope.interface import implements
30
31 from flumotion.common import log, interfaces, bundleclient, errors
32 from flumotion.common import messages
33 from flumotion.common.netutils import addressGetHost
34 from flumotion.configure import configure
35 from flumotion.twisted import pb as fpb
36 from flumotion.twisted.compat import reactor
37
38 __version__ = "$Rev$"
39
40
42 """
43 I am a base interface for PB clients interfacing with PB server-side
44 avatars.
45 Used by admin/worker/component to talk to manager's vishnu,
46 and by job to talk to worker's brain.
47
48 @ivar remote: a remote reference to the server-side object on
49 which perspective_(methodName) methods can be called
50 @type remote: L{twisted.spread.pb.RemoteReference}
51 @type bundleLoader: L{flumotion.common.bundleclient.BundleLoader}
52 """
53
54
55
56 implements(interfaces.IMedium)
57 logCategory = "basemedium"
58 remoteLogName = "baseavatar"
59
60 remote = None
61 bundleLoader = None
62
64 """
65 Set the given remoteReference as the reference to the server-side
66 avatar.
67
68 @param remoteReference: L{twisted.spread.pb.RemoteReference}
69 """
70 self.debug('%r.setRemoteReference: %r' % (self, remoteReference))
71 self.remote = remoteReference
72
73 def nullRemote(x):
74 self.debug('%r: disconnected from %r' % (self, self.remote))
75 self.remote = None
76 self.remote.notifyOnDisconnect(nullRemote)
77
78 self.bundleLoader = bundleclient.BundleLoader(self.callRemote)
79
80
81 tarzan = None
82 jane = None
83 try:
84 transport = remoteReference.broker.transport
85 tarzan = transport.getHost()
86 jane = transport.getPeer()
87 except Exception, e:
88 self.debug("could not get connection info, reason %r" % e)
89 if tarzan and jane:
90 self.debug("connection is from me on %s to remote on %s" % (
91 addressGetHost(tarzan),
92 addressGetHost(jane)))
93
95 """
96 Does the medium have a remote reference to a server-side avatar ?
97 """
98 return self.remote != None
99
101 """
102 Call the given method with the given arguments remotely on the
103 server-side avatar.
104
105 Gets serialized to server-side perspective_ methods.
106
107 @param level: the level we should log at (log.DEBUG, log.INFO, etc)
108 @type level: int
109 @param stackDepth: the number of stack frames to go back to get
110 file and line information, negative or zero.
111 @type stackDepth: non-positive int
112 @param name: name of the remote method
113 @type name: str
114 """
115 if level is not None:
116 debugClass = str(self.__class__).split(".")[-1].upper()
117 startArgs = [self.remoteLogName, debugClass, name]
118 format, debugArgs = log.getFormatArgs(
119 '%s --> %s: callRemote(%s, ', startArgs,
120 ')', (), args, kwargs)
121 logKwArgs = self.doLog(level, stackDepth - 1,
122 format, *debugArgs)
123
124 if not self.remote:
125 self.warning('Tried to callRemote(%s), but we are disconnected'
126 % name)
127 return defer.fail(errors.NotConnectedError())
128
129 def callback(result):
130 format, debugArgs = log.getFormatArgs(
131 '%s <-- %s: callRemote(%s, ', startArgs,
132 '): %s', (log.ellipsize(result), ), args, kwargs)
133 self.doLog(level, -1, format, *debugArgs, **logKwArgs)
134 return result
135
136 def errback(failure):
137 format, debugArgs = log.getFormatArgs(
138 '%s <-- %s: callRemote(%s, ', startArgs,
139 '): %r', (failure, ), args, kwargs)
140 self.doLog(level, -1, format, *debugArgs, **logKwArgs)
141 return failure
142
143 d = self.remote.callRemote(name, *args, **kwargs)
144 if level is not None:
145 d.addCallbacks(callback, errback)
146 return d
147
149 """
150 Call the given method with the given arguments remotely on the
151 server-side avatar.
152
153 Gets serialized to server-side perspective_ methods.
154 """
155 return self.callRemoteLogging(log.DEBUG, -1, name, *args,
156 **kwargs)
157
159 """
160 Returns the given function in the given module, loading the
161 module from a bundle.
162
163 If we can't find the bundle for the given module, or if the
164 given module does not contain the requested function, we will
165 raise L{flumotion.common.errors.RemoteRunError} (perhaps a
166 poorly chosen error). If importing the module raises an
167 exception, that exception will be passed through unmodified.
168
169 @param module: module the function lives in
170 @type module: str
171 @param function: function to run
172 @type function: str
173
174 @returns: a callable, the given function in the given module.
175 """
176
177 def gotModule(mod):
178 if hasattr(mod, function):
179 return getattr(mod, function)
180 else:
181 msg = 'No procedure named %s in module %s' % (function,
182 module)
183 self.warning('%s', msg)
184 raise errors.RemoteRunError(msg)
185
186 def gotModuleError(failure):
187 failure.trap(errors.NoBundleError)
188 msg = 'Failed to find bundle for module %s' % module
189 self.warning('%s', msg)
190 raise errors.RemoteRunError(msg)
191
192 d = self.bundleLoader.loadModule(module)
193 d.addCallbacks(gotModule, gotModuleError)
194 return d
195
197 """
198 Runs the given function in the given module with the given
199 arguments.
200
201 This method calls getBundledFunction and then invokes the
202 function. Any error raised by getBundledFunction or by invoking
203 the function will be passed through unmodified.
204
205 Callers that expect to return their result over a PB connection
206 should catch nonserializable exceptions so as to prevent nasty
207 backtraces in the logs.
208
209 @param module: module the function lives in
210 @type module: str
211 @param function: function to run
212 @type function: str
213
214 @returns: the return value of the given function in the module.
215 """
216 self.debug('runBundledFunction(%r, %r)', module, function)
217
218 def gotFunction(proc):
219
220 def invocationError(failure):
221 self.warning('Exception raised while calling '
222 '%s.%s(*args=%r, **kwargs=%r): %s',
223 module, function, args, kwargs,
224 log.getFailureMessage(failure))
225 return failure
226
227 self.debug('calling %s.%s(%r, %r)', module, function, args,
228 kwargs)
229 d = defer.maybeDeferred(proc, *args, **kwargs)
230 d.addErrback(invocationError)
231 return d
232
233 d = self.getBundledFunction(module, function)
234 d.addCallback(gotFunction)
235 return d
236
237
270
271 if self.remote:
272 self.log('pinging')
273 d = self.callRemoteLogging(log.LOG, 0, 'ping')
274 d.addCallbacks(pingback, pingFailed)
275 else:
276 self.info('tried to ping, but disconnected yo')
277
278 self._pingDC = self._clock.callLater(self._pingInterval,
279 self._ping)
280
285
293 d.addCallback(cb)
294 return d
295
307
316
318 if self.remote:
319 self.remote.broker.transport.loseConnection()
320
329 self.remote.notifyOnDisconnect(stopPingingCb)
330
331 self.startPinging(self._disconnect)
332
334 """
335 Sets a marker that will be prefixed to the log strings. Setting this
336 marker to multiple elements at a time helps debugging.
337 @param marker: A string to prefix all the log strings.
338 @type marker: str
339 """
340 self.writeMarker(marker, level)
341