Package flumotion :: Package common :: Module startset
[hide private]

Source Code for Module flumotion.common.startset

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """a data structure to manage asynchronous avatar starts and shutdowns 
 23  """ 
 24   
 25  from twisted.internet import defer 
 26   
 27  from flumotion.common import log 
 28   
 29  __version__ = "$Rev$" 
 30   
 31   
 32  # This class was factored out of the worker's jobheaven, so sometimes 
 33  # the comments talk about jobs, but they refer to any asynchronous 
 34  # process. For example the multiadmin uses this to manage its 
 35  # connections to remote managers. 
 36   
 37   
38 -class StartSet(log.Loggable):
39
40 - def __init__(self, avatarLoggedIn, alreadyStartingError, 41 alreadyRunningError):
42 """Create a StartSet, a data structure for managing starts and 43 stops of remote processes, for example jobs in a jobheaven. 44 45 @param avatarLoggedIn: a procedure of type avatarId->boolean; 46 should return True if the avatarId is logged in and "ready", and 47 False otherwise. An avatarId is ready if avatarStarted() could 48 have been called on it. This interface is made this way because 49 it is assumed that whatever code instantiates a StartSet keeps 50 track of "ready" remote processes, and this way we prevent data 51 duplication. 52 @param alreadyStartingError: An exception class to raise if 53 createStart() is called, but there is already a create deferred 54 registered for that avatarId. 55 @param alreadyRunningError: An exception class to raise if 56 createStart() is called, but there is already a "ready" process 57 with that avatarId. 58 """ 59 self._avatarLoggedIn = avatarLoggedIn 60 self._alreadyStartingError = alreadyStartingError 61 self._alreadyRunningError = alreadyRunningError 62 63 self._createDeferreds = {} # avatarId => deferred 64 self._shutdownDeferreds = {} # avatarId => deferred
65
66 - def createStart(self, avatarId):
67 """ 68 Create and register a deferred for starting a given process. 69 The deferred will be fired when the process is ready, as 70 triggered by a call to createSuccess(). 71 72 @param avatarId: the id of the remote process, for example the 73 avatarId of the job 74 75 @rtype: L{twisted.internet.defer.Deferred} 76 """ 77 self.debug('making create deferred for %s', avatarId) 78 79 d = defer.Deferred() 80 81 # the question of "what jobs do we know about" is answered in 82 # three places: the create deferreds hash, the set of logged in 83 # avatars, and the shutdown deferreds hash. there are four 84 # possible answers: 85 if avatarId in self._createDeferreds: 86 # (1) a job is already starting: it is in the 87 # createdeferreds hash 88 self.info('already have a create deferred for %s', avatarId) 89 raise self._alreadyStartingError(avatarId) 90 elif avatarId in self._shutdownDeferreds: 91 # (2) a job is shutting down; note it is also in 92 # heaven.avatars 93 self.debug('waiting for previous %s to shut down like it ' 94 'said it would', avatarId) 95 # fixme: i don't understand this code 96 97 def ensureShutdown(res, 98 shutdown=self._shutdownDeferreds[avatarId]): 99 shutdown.addCallback(lambda _: res) 100 return shutdown
101 d.addCallback(ensureShutdown) 102 elif self._avatarLoggedIn(avatarId): 103 # (3) a job is running fine 104 self.info('avatar named %s already running', avatarId) 105 raise self._alreadyRunningError(avatarId) 106 else: 107 # (4) it's new; we know of nothing with this avatarId 108 pass 109 110 self.debug('registering deferredCreate for %s', avatarId) 111 self._createDeferreds[avatarId] = d 112 return d
113
114 - def createSuccess(self, avatarId):
115 """ 116 Trigger a deferred start previously registerd via createStart(). 117 For example, a JobHeaven might call this method when a job has 118 logged in and been told to start a component. 119 120 @param avatarId: the id of the remote process, for example the 121 avatarId of the job 122 """ 123 self.debug('triggering create deferred for %s', avatarId) 124 if not avatarId in self._createDeferreds: 125 self.warning('No create deferred registered for %s', avatarId) 126 return 127 128 d = self._createDeferreds[avatarId] 129 del self._createDeferreds[avatarId] 130 # return the avatarId the component will use to the original caller 131 d.callback(avatarId)
132
133 - def createFailed(self, avatarId, exception):
134 """ 135 Notify the caller that a create has failed, and remove the create 136 from the list of pending creates. 137 138 @param avatarId: the id of the remote process, for example the 139 avatarId of the job 140 @param exception: either an exception or a failure describing 141 why the create failed. 142 """ 143 self.debug('create deferred failed for %s', avatarId) 144 if not avatarId in self._createDeferreds: 145 self.warning('No create deferred registered for %s', avatarId) 146 return 147 148 d = self._createDeferreds[avatarId] 149 del self._createDeferreds[avatarId] 150 d.errback(exception)
151
152 - def createRegistered(self, avatarId):
153 """ 154 Check if a deferred create has been registered for the given avatarId. 155 156 @param avatarId: the id of the remote process, for example the 157 avatarId of the job 158 159 @returns: The deferred create, if one has been registered. 160 Otherwise None. 161 """ 162 return self._createDeferreds.get(avatarId, None)
163
164 - def shutdownStart(self, avatarId):
165 """ 166 Create and register a deferred that will be fired when a process 167 has shut down cleanly. 168 169 @param avatarId: the id of the remote process, for example the 170 avatarId of the job 171 172 @rtype: L{twisted.internet.defer.Deferred} 173 """ 174 self.debug('making shutdown deferred for %s', avatarId) 175 176 if avatarId in self._shutdownDeferreds: 177 self.warning('already have a shutdown deferred for %s', 178 avatarId) 179 return self._shutdownDeferreds[avatarId] 180 else: 181 self.debug('registering shutdown for %s', avatarId) 182 d = defer.Deferred() 183 self._shutdownDeferreds[avatarId] = d 184 return d
185
186 - def shutdownSuccess(self, avatarId):
187 """ 188 Trigger a callback on a deferred previously registered via 189 shutdownStart(). For example, a JobHeaven would call this when a 190 job for which shutdownStart() was called is reaped. 191 192 @param avatarId: the id of the remote process, for example the 193 avatarId of the job 194 """ 195 self.debug('triggering shutdown deferred for %s', avatarId) 196 if not avatarId in self._shutdownDeferreds: 197 self.warning('No shutdown deferred registered for %s', avatarId) 198 return 199 200 d = self._shutdownDeferreds.pop(avatarId) 201 d.callback(avatarId)
202
203 - def shutdownRegistered(self, avatarId):
204 """ 205 Check if a deferred shutdown has been registered for the given 206 avatarId. 207 208 @param avatarId: the id of the remote process, for example the 209 avatarId of the job 210 211 @returns: True if a deferred shutdown has been registered for 212 this object, False otherwise 213 """ 214 return avatarId in self._shutdownDeferreds
215
216 - def avatarStarted(self, avatarId):
217 """ 218 Notify the startset that an avatar has started. If there was a 219 create deferred registered for this avatar, this will cause 220 createSuccess() to be called. 221 222 @param avatarId: the id of the remote process, for example the 223 avatarId of the job 224 """ 225 if avatarId in self._createDeferreds: 226 self.createSuccess(avatarId) 227 else: 228 self.log('avatar %s started, but we were not waiting for' 229 ' it', avatarId)
230
231 - def avatarStopped(self, avatarId, getFailure):
232 """ 233 Notify the startset that an avatar has stopped. If there was a 234 shutdown deferred registered for this avatar, this will cause 235 shutdownSuccess() to be called. 236 237 On the other hand, if there was a create deferred still pending, 238 we will call createFailed with the result of calling getFailure. 239 240 If no start or create was registered, we do nothing. 241 242 @param avatarId: the id of the remote process, for example the 243 avatarId of the job 244 @param getFailure: procedure of type avatarId -> Failure. The 245 returned failure should describe the reason that the job failed. 246 """ 247 if avatarId in self._createDeferreds: 248 self.createFailed(avatarId, getFailure(avatarId)) 249 elif avatarId in self._shutdownDeferreds: 250 self.shutdownSuccess(avatarId) 251 else: 252 self.debug('unknown avatar %s logged out', avatarId)
253