Package flumotion :: Package worker :: Module worker
[hide private]

Source Code for Module flumotion.worker.worker

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """worker-side objects to handle worker clients 
 23  """ 
 24   
 25  import signal 
 26   
 27  from twisted.internet import defer, error, reactor 
 28  from zope.interface import implements 
 29   
 30  from flumotion.common import errors, interfaces, log 
 31  from flumotion.worker import medium, job, feedserver 
 32  from flumotion.twisted.defer import defer_call_later 
 33   
 34  __version__ = "$Rev$" 
 35   
 36   
37 -class ProxyBouncer(log.Loggable):
38 logCategory = "proxybouncer" 39 40 """ 41 I am a bouncer that proxies authenticate calls to a remote FPB root 42 object. 43 """ 44
45 - def __init__(self, remote):
46 """ 47 @param remote: an object that has .callRemote() 48 """ 49 self._remote = remote
50
51 - def getKeycardClasses(self):
52 """ 53 Call me before asking me to authenticate, so I know what I can 54 authenticate. 55 """ 56 return self._remote.callRemote('getKeycardClasses')
57
58 - def authenticate(self, keycard):
59 self.debug("Authenticating keycard %r against remote bouncer", 60 keycard) 61 return self._remote.callRemote('authenticate', None, keycard)
62 63 # Similar to Vishnu, but for worker related classes 64 65
66 -class WorkerBrain(log.Loggable):
67 """ 68 I am the main object in the worker process, managing jobs and everything 69 related. 70 I live in the main worker process. 71 72 @ivar authenticator: authenticator worker used to log in to manager 73 @type authenticator L{flumotion.twisted.pb.Authenticator} 74 @ivar medium: 75 @type medium: L{medium.WorkerMedium} 76 @ivar jobHeaven: 77 @type jobHeaven: L{job.ComponentJobHeaven} 78 @ivar checkHeaven: 79 @type checkHeaven: L{job.CheckJobHeaven} 80 @ivar workerClientFactory: 81 @type workerClientFactory: L{medium.WorkerClientFactory} 82 @ivar feedServerPort: TCP port the Feed Server is listening on 83 @type feedServerPort: int 84 """ 85 86 implements(interfaces.IFeedServerParent) 87 88 logCategory = 'workerbrain' 89
90 - def __init__(self, options):
91 """ 92 @param options: the optparsed dictionary of command-line options 93 @type options: an object with attributes 94 """ 95 self.options = options 96 self.workerName = options.name 97 98 # the last port is reserved for our FeedServer 99 if not self.options.randomFeederports: 100 self.ports = self.options.feederports[:-1] 101 else: 102 self.ports = [] 103 104 self.medium = medium.WorkerMedium(self) 105 106 # really should be componentJobHeaven, but this is shorter :) 107 self.jobHeaven = job.ComponentJobHeaven(self) 108 # for ephemeral checks 109 self.checkHeaven = job.CheckJobHeaven(self) 110 111 self.managerConnectionInfo = None 112 113 # it's possible we don't have a feed server, if we are 114 # configured to have 0 tcp ports; setup this in listen() 115 self.feedServer = None 116 117 self.stopping = False 118 reactor.addSystemEventTrigger('before', 'shutdown', 119 self.shutdownHandler) 120 self._installHUPHandler()
121
122 - def _installHUPHandler(self):
123 124 def sighup(signum, frame): 125 if self._oldHUPHandler: 126 self.log('got SIGHUP, calling previous handler %r', 127 self._oldHUPHandler) 128 self._oldHUPHandler(signum, frame) 129 self.debug('telling kids about new log file descriptors') 130 self.jobHeaven.rotateChildLogFDs()
131 132 handler = signal.signal(signal.SIGHUP, sighup) 133 if handler == signal.SIG_DFL or handler == signal.SIG_IGN: 134 self._oldHUPHandler = None 135 else: 136 self._oldHUPHandler = handler
137
138 - def listen(self):
139 """ 140 Start listening on FeedServer (incoming eater requests) and 141 JobServer (through which we communicate with our children) ports 142 143 @returns: True if we successfully listened on both ports 144 """ 145 # set up feed server if we have the feederports for it 146 try: 147 self.feedServer = self._makeFeedServer() 148 except error.CannotListenError, e: 149 self.warning("Failed to listen on feed server port: %r", e) 150 return False 151 152 try: 153 self.jobHeaven.listen() 154 except error.CannotListenError, e: 155 self.warning("Failed to listen on job server port: %r", e) 156 return False 157 158 try: 159 self.checkHeaven.listen() 160 except error.CannotListenError, e: 161 self.warning("Failed to listen on check server port: %r", e) 162 return False 163 164 return True
165
166 - def _makeFeedServer(self):
167 """ 168 @returns: L{flumotion.worker.feedserver.FeedServer} 169 """ 170 port = None 171 if self.options.randomFeederports: 172 port = 0 173 elif not self.options.feederports: 174 self.info('Not starting feed server because no port is ' 175 'configured') 176 return None 177 else: 178 port = self.options.feederports[-1] 179 180 return feedserver.FeedServer(self, ProxyBouncer(self), port)
181
182 - def login(self, managerConnectionInfo):
183 self.managerConnectionInfo = managerConnectionInfo 184 self.medium.startConnecting(managerConnectionInfo)
185
186 - def callRemote(self, methodName, *args, **kwargs):
187 return self.medium.callRemote(methodName, *args, **kwargs)
188
189 - def shutdownHandler(self):
190 if self.stopping: 191 self.warning("Already shutting down, ignoring shutdown request") 192 return 193 194 self.info("Reactor shutting down, stopping jobHeaven") 195 self.stopping = True 196 197 l = [self.jobHeaven.shutdown(), self.checkHeaven.shutdown()] 198 if self.feedServer: 199 l.append(self.feedServer.shutdown()) 200 # Don't fire this other than from a callLater 201 return defer_call_later(defer.DeferredList(l))
202 203 ### These methods called by feed server 204
205 - def feedToFD(self, componentId, feedName, fd, eaterId):
206 """ 207 Called from the FeedAvatar to pass a file descriptor on to 208 the job running the component for this feeder. 209 210 @returns: whether the fd was successfully handed off to the component. 211 """ 212 if componentId not in self.jobHeaven.avatars: 213 self.warning("No such component %s running", componentId) 214 return False 215 216 avatar = self.jobHeaven.avatars[componentId] 217 return avatar.sendFeed(feedName, fd, eaterId)
218
219 - def eatFromFD(self, componentId, eaterAlias, fd, feedId):
220 """ 221 Called from the FeedAvatar to pass a file descriptor on to 222 the job running the given component. 223 224 @returns: whether the fd was successfully handed off to the component. 225 """ 226 if componentId not in self.jobHeaven.avatars: 227 self.warning("No such component %s running", componentId) 228 return False 229 230 avatar = self.jobHeaven.avatars[componentId] 231 return avatar.receiveFeed(eaterAlias, fd, feedId)
232 233 ### these methods called by WorkerMedium 234
235 - def getPorts(self):
236 return self.ports, self.options.randomFeederports
237
238 - def getFeedServerPort(self):
239 if self.feedServer: 240 return self.feedServer.getPortNum() 241 else: 242 return None
243
244 - def create(self, avatarId, type, moduleName, methodName, nice, 245 conf):
246 247 def getBundles(): 248 # set up bundles as we need to have a pb connection to 249 # download the modules -- can't do that in the kid yet. 250 moduleNames = [moduleName] 251 for plugs in conf.get('plugs', {}).values(): 252 for plug in plugs: 253 for entry in plug.get('entries', {}).values(): 254 moduleNames.append(entry['module-name']) 255 self.debug('setting up bundles for %r', moduleNames) 256 return self.medium.bundleLoader.getBundles(moduleName=moduleNames)
257 258 def spawnJob(bundles): 259 return self.jobHeaven.spawn(avatarId, type, moduleName, 260 methodName, nice, bundles, conf) 261 262 def createError(failure): 263 failure.trap(errors.ComponentCreateError) 264 self.debug('create deferred for %s failed, forwarding error', 265 avatarId) 266 return failure 267 268 def success(res): 269 self.debug('create deferred for %s succeeded (%r)', 270 avatarId, res) 271 return res 272 273 self.info('Starting component "%s" of type "%s"', avatarId, 274 type) 275 d = getBundles() 276 d.addCallback(spawnJob) 277 d.addCallback(success) 278 d.addErrback(createError) 279 return d 280
281 - def runCheck(self, module, function, *args, **kwargs):
282 283 def getBundles(): 284 self.debug('setting up bundles for %s', module) 285 return self.medium.bundleLoader.getBundles(moduleName=module)
286 287 def runCheck(bundles): 288 return self.checkHeaven.runCheck(bundles, module, function, 289 *args, **kwargs) 290 291 d = getBundles() 292 d.addCallback(runCheck) 293 return d 294
295 - def getComponents(self):
296 return [job.avatarId for job in self.jobHeaven.getJobInfos()]
297
298 - def killJob(self, avatarId, signum):
299 self.jobHeaven.killJob(avatarId, signum)
300