Package flumotion :: Package component :: Package misc :: Package porter :: Module porter
[hide private]

Source Code for Module flumotion.component.misc.porter.porter

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_porter -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import os 
 23  import random 
 24  import socket 
 25  import string 
 26  import time 
 27  from urllib2 import urlparse 
 28   
 29  from twisted.cred import portal 
 30  from twisted.internet import protocol, reactor, address, error, defer 
 31  from twisted.spread import pb 
 32  from zope.interface import implements 
 33   
 34  from flumotion.common import medium, log, messages, errors 
 35  from flumotion.common.i18n import N_, gettexter 
 36  from flumotion.component import component 
 37  from flumotion.component.component import moods 
 38  from flumotion.twisted import credentials, fdserver, checkers 
 39  from flumotion.twisted import reflect 
 40   
 41  __version__ = "$Rev$" 
 42  T_ = gettexter() 
 43   
 44   
45 -class PorterAvatar(pb.Avatar, log.Loggable):
46 """ 47 An Avatar in the porter representing a streamer 48 """ 49
50 - def __init__(self, avatarId, porter, mind):
51 self.avatarId = avatarId 52 self.porter = porter 53 54 # The underlying transport is now accessible as 55 # self.mind.broker.transport, on which we can call sendFileDescriptor 56 self.mind = mind
57
58 - def isAttached(self):
59 return self.mind != None
60
61 - def logout(self):
62 self.debug("porter client %s logging out", self.avatarId) 63 self.mind = None
64
65 - def perspective_registerPath(self, path):
66 self.log("Perspective called: registering path \"%s\"" % path) 67 self.porter.registerPath(path, self)
68
69 - def perspective_deregisterPath(self, path):
70 self.log("Perspective called: deregistering path \"%s\"" % path) 71 self.porter.deregisterPath(path, self)
72
73 - def perspective_registerPrefix(self, prefix):
74 self.log("Perspective called: registering default") 75 self.porter.registerPrefix(prefix, self)
76
77 - def perspective_deregisterPrefix(self, prefix):
78 self.log("Perspective called: deregistering default") 79 self.porter.deregisterPrefix(prefix, self)
80 81
82 -class PorterRealm(log.Loggable):
83 """ 84 A Realm within the Porter that creates Avatars for streamers logging into 85 the porter. 86 """ 87 implements(portal.IRealm) 88
89 - def __init__(self, porter):
90 """ 91 @param porter: The porter that avatars created from here should use. 92 @type porter: L{Porter} 93 """ 94 self.porter = porter
95
96 - def requestAvatar(self, avatarId, mind, *interfaces):
97 self.log("Avatar requested for avatarId %s, mind %r, interfaces %r", 98 avatarId, mind, interfaces) 99 if pb.IPerspective in interfaces: 100 avatar = PorterAvatar(avatarId, self.porter, mind) 101 return pb.IPerspective, avatar, avatar.logout 102 else: 103 raise NotImplementedError("no interface")
104 105
106 -class PorterMedium(component.BaseComponentMedium):
107
108 - def remote_getPorterDetails(self):
109 """ 110 Return the location, login username/password, and listening port 111 and interface for the porter as a tuple (path, username, 112 password, port, interface, external-interface). 113 """ 114 return (self.comp._socketPath, self.comp._username, 115 self.comp._password, self.comp._iptablesPort, 116 self.comp._interface, self.comp._external_interface)
117 118
119 -class Porter(component.BaseComponent, log.Loggable):
120 """ 121 The porter optionally sits in front of a set of streamer components. 122 The porter is what actually deals with incoming connections on a socket. 123 It decides which streamer to direct the connection to, then passes the FD 124 (along with some amount of already-read data) to the appropriate streamer. 125 """ 126 127 componentMediumClass = PorterMedium 128
129 - def init(self):
130 # We maintain a map of path -> avatar (the underlying transport is 131 # accessible from the avatar, we need this for FD-passing) 132 self._mappings = {} 133 self._prefixes = {} 134 135 self._socketlistener = None 136 137 self._socketPath = None 138 self._username = None 139 self._password = None 140 self._port = None 141 self._iptablesPort = None 142 self._porterProtocol = None 143 144 self._interface = '' 145 self._external_interface = ''
146
147 - def registerPath(self, path, avatar):
148 """ 149 Register a path as being served by a streamer represented by this 150 avatar. Will remove any previous registration at this path. 151 152 @param path: The path to register 153 @type path: str 154 @param avatar: The avatar representing the streamer to direct this path 155 to 156 @type avatar: L{PorterAvatar} 157 """ 158 self.debug("Registering porter path \"%s\" to %r" % (path, avatar)) 159 if path in self._mappings: 160 self.warning("Replacing existing mapping for path \"%s\"" % path) 161 162 self._mappings[path] = avatar
163
164 - def deregisterPath(self, path, avatar):
165 """ 166 Attempt to deregister the given path. A deregistration will only be 167 accepted if the mapping is to the avatar passed. 168 169 @param path: The path to deregister 170 @type path: str 171 @param avatar: The avatar representing the streamer being deregistered 172 @type avatar: L{PorterAvatar} 173 """ 174 if path in self._mappings: 175 if self._mappings[path] == avatar: 176 self.debug("Removing porter mapping for \"%s\"" % path) 177 del self._mappings[path] 178 else: 179 self.warning( 180 "Mapping not removed: refers to a different avatar") 181 else: 182 self.warning("Mapping not removed: no mapping found")
183
184 - def registerPrefix(self, prefix, avatar):
185 """ 186 Register a destination for all requests directed to anything beginning 187 with a specified prefix. Where there are multiple matching prefixes, 188 the longest is selected. 189 190 @param avatar: The avatar being registered 191 @type avatar: L{PorterAvatar} 192 """ 193 194 self.debug("Setting prefix \"%s\" for porter", prefix) 195 if prefix in self._prefixes: 196 self.warning("Overwriting prefix") 197 198 self._prefixes[prefix] = avatar
199
200 - def deregisterPrefix(self, prefix, avatar):
201 """ 202 Attempt to deregister a default destination for all requests not 203 directed to a specifically-mapped path. This will only succeed if the 204 default is currently equal to this avatar. 205 206 @param avatar: The avatar being deregistered 207 @type avatar: L{PorterAvatar} 208 """ 209 if prefix not in self._prefixes: 210 self.warning("Mapping not removed: no mapping found") 211 return 212 213 if self._prefixes[prefix] == avatar: 214 self.debug("Removing prefix destination from porter") 215 del self._prefixes[prefix] 216 else: 217 self.warning( 218 "Not removing prefix destination: expected avatar not found")
219
220 - def findPrefixMatch(self, path):
221 found = None 222 # TODO: Horribly inefficient. Replace with pathtree code. 223 for prefix in self._prefixes.keys(): 224 self.log("Checking: %r, %r" % (prefix, path)) 225 if (path.startswith(prefix) and 226 (not found or len(found) < len(prefix))): 227 found = prefix 228 if found: 229 return self._prefixes[found] 230 else: 231 return None
232
233 - def findDestination(self, path):
234 """ 235 Find a destination Avatar for this path. 236 @returns: The Avatar for this mapping, or None. 237 """ 238 239 if path in self._mappings: 240 return self._mappings[path] 241 else: 242 return self.findPrefixMatch(path)
243
244 - def generateSocketPath(self):
245 """ 246 Generate a socket pathname in an appropriate location 247 """ 248 # Also see worker/worker.py:_getSocketPath(), and note that 249 # this suffers from the same potential race. 250 import tempfile 251 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.porter.') 252 os.close(fd) 253 254 return name
255
256 - def generateRandomString(self, numchars):
257 """ 258 Generate a random US-ASCII string of length numchars 259 """ 260 string = "" 261 chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" 262 for _ in range(numchars): 263 string += chars[random.randint(0, len(chars) - 1)] 264 265 return string
266
267 - def have_properties(self):
268 props = self.config['properties'] 269 270 self.fixRenamedProperties(props, 271 [('socket_path', 'socket-path')]) 272 273 # We can operate in two modes: explicitly configured (neccesary if you 274 # want to handle connections from components in other managers), and 275 # self-configured (which is sufficient for slaving only streamers 276 # within this manager 277 if 'socket-path' in props: 278 # Explicitly configured 279 self._socketPath = props['socket-path'] 280 self._username = props['username'] 281 self._password = props['password'] 282 else: 283 # Self-configuring. Use a randomly create username/password, and 284 # a socket with a random name. 285 self._username = self.generateRandomString(12) 286 self._password = self.generateRandomString(12) 287 self._socketPath = self.generateSocketPath() 288 289 self._requirePassword = props.get('require-password', True) 290 self._socketMode = props.get('socket-mode', 0666) 291 self._port = int(props['port']) 292 self._iptablesPort = int(props.get('iptables-port', self._port)) 293 self._porterProtocol = props.get('protocol', 294 'flumotion.component.misc.porter.porter.HTTPPorterProtocol') 295 self._interface = props.get('interface', '') 296 # if a config has no external-interface set, set it to the same as 297 # interface 298 self._external_interface = props.get('external-interface', 299 self._interface)
300
301 - def do_stop(self):
302 d = None 303 if self._socketlistener: 304 # stopListening() calls (via a callLater) connectionLost(), which 305 # will unlink our socket, so we don't need to explicitly delete it. 306 d = self._socketlistener.stopListening() 307 self._socketlistener = None 308 return d
309
310 - def do_setup(self):
311 # Create our combined PB-server/fd-passing channel 312 self.have_properties() 313 realm = PorterRealm(self) 314 checker = checkers.FlexibleCredentialsChecker() 315 checker.addUser(self._username, self._password) 316 if not self._requirePassword: 317 checker.allowPasswordless(True) 318 319 p = portal.Portal(realm, [checker]) 320 serverfactory = pb.PBServerFactory(p) 321 322 try: 323 # Rather than a normal listenTCP() or listenUNIX(), we use 324 # listenWith so that we can specify our particular Port, which 325 # creates Transports that we know how to pass FDs over. 326 try: 327 os.unlink(self._socketPath) 328 except OSError: 329 pass 330 331 self._socketlistener = reactor.listenWith( 332 fdserver.FDPort, self._socketPath, 333 serverfactory, mode=self._socketMode) 334 self.info("Now listening on socketPath %s", self._socketPath) 335 except error.CannotListenError, e: 336 self.warning("Failed to create socket %s" % self._socketPath) 337 m = messages.Error(T_(N_( 338 "Network error: socket path %s is not available."), 339 self._socketPath)) 340 self.addMessage(m) 341 self.setMood(moods.sad) 342 return defer.fail(errors.ComponentSetupHandledError()) 343 344 # Create the class that deals with the specific protocol we're proxying 345 # in this porter. 346 try: 347 proto = reflect.namedAny(self._porterProtocol) 348 self.debug("Created proto %r" % proto) 349 except (ImportError, AttributeError): 350 self.warning("Failed to import protocol '%s', defaulting to HTTP" % 351 self._porterProtocol) 352 proto = HTTPPorterProtocol 353 354 # And of course we also want to listen for incoming requests in the 355 # appropriate protocol (HTTP, RTSP, etc.) 356 factory = PorterProtocolFactory(self, proto) 357 try: 358 reactor.listenWith( 359 fdserver.PassableServerPort, self._port, factory, 360 interface=self._interface) 361 self.info("Now listening on interface %r on port %d", 362 self._interface, self._port) 363 except error.CannotListenError, e: 364 self.warning("Failed to listen on interface %r on port %d", 365 self._interface, self._port) 366 m = messages.Error(T_(N_( 367 "Network error: TCP port %d is not available."), self._port)) 368 self.addMessage(m) 369 self.setMood(moods.sad) 370 return defer.fail(errors.ComponentSetupHandledError())
371 372
373 -class PorterProtocolFactory(protocol.Factory):
374
375 - def __init__(self, porter, protocol):
376 self._porter = porter 377 self.protocol = protocol
378
379 - def buildProtocol(self, addr):
380 p = self.protocol(self._porter) 381 p.factory = self 382 return p
383 384
385 -class PorterProtocol(protocol.Protocol, log.Loggable):
386 """ 387 The base porter is capable of accepting HTTP-like protocols (including 388 RTSP) - it reads the first line of a request, and makes the decision 389 solely on that. 390 391 We can't guarantee that we read precisely a line, so the buffer we 392 accumulate will actually be larger than what we actually parse. 393 394 @cvar MAX_SIZE: the maximum number of bytes allowed for the first line 395 @cvar delimiters: a list of valid line delimiters I check for 396 """ 397 398 logCategory = 'porterprotocol' 399 400 # Don't permit a first line longer than this. 401 MAX_SIZE = 4096 402 403 # Timeout any client connected to the porter for longer than this. A normal 404 # client should only ever be connected for a fraction of a second. 405 PORTER_CLIENT_TIMEOUT = 30 406 407 # In fact, because we check \r, we'll never need to check for \r\n - we 408 # leave this in as \r\n is the more correct form. At the other end, this 409 # gets processed by a full protocol implementation, so being flexible hurts 410 # us not at all 411 delimiters = ['\r\n', '\n', '\r'] 412
413 - def __init__(self, porter):
414 self._buffer = '' 415 self._porter = porter 416 self.requestId = None # a string that should identify the request 417 418 self._timeoutDC = reactor.callLater(self.PORTER_CLIENT_TIMEOUT, 419 self._timeout)
420
421 - def connectionMade(self):
422 423 self.requestId = self.generateRequestId() 424 # PROBE: accepted connection 425 self.debug("[fd %5d] (ts %f) (request-id %r) accepted connection", 426 self.transport.fileno(), time.time(), self.requestId) 427 428 protocol.Protocol.connectionMade(self)
429
430 - def _timeout(self):
431 self._timeoutDC = None 432 self.debug("Timing out porter client after %d seconds", 433 self.PORTER_CLIENT_TIMEOUT) 434 self.transport.loseConnection()
435
436 - def connectionLost(self, reason):
437 if self._timeoutDC: 438 self._timeoutDC.cancel() 439 self._timeoutDC = None
440
441 - def dataReceived(self, data):
442 self._buffer = self._buffer + data 443 self.log("Got data, buffer now \"%s\"" % self._buffer) 444 # We accept more than just '\r\n' (the true HTTP line end) in the 445 # interests of compatibility. 446 for delim in self.delimiters: 447 try: 448 line, remaining = self._buffer.split(delim, 1) 449 break 450 except ValueError: 451 # We didn't find this delimiter; continue with the others. 452 pass 453 else: 454 # Failed to find a valid delimiter. 455 self.log("No valid delimiter found") 456 if len(self._buffer) > self.MAX_SIZE: 457 458 # PROBE: dropping 459 self.debug("[fd %5d] (ts %f) (request-id %r) dropping, " 460 "buffer exceeded", 461 self.transport.fileno(), time.time(), 462 self.requestId) 463 464 return self.transport.loseConnection() 465 else: 466 # No delimiter found; haven't reached the length limit yet. 467 # Wait for more data. 468 return 469 470 # Got a line. self._buffer is still our entire buffer, should be 471 # provided to the slaved process. 472 parsed = self.parseLine(line) 473 if not parsed: 474 self.log("Couldn't parse the first line") 475 return self.transport.loseConnection() 476 477 identifier = self.extractIdentifier(parsed) 478 if not identifier: 479 self.log("Couldn't find identifier in first line") 480 return self.transport.loseConnection() 481 482 if self.requestId: 483 self.log("Injecting request-id %r", self.requestId) 484 parsed = self.injectRequestId(parsed, self.requestId) 485 # Since injecting the token might have modified the parsed 486 # representation of the request, we need to reconstruct the buffer. 487 # Fortunately, we know what delimiter did we split on, what's the 488 # remaining part and that we only split the buffer in two parts 489 self._buffer = delim.join((self.unparseLine(parsed), remaining)) 490 491 # PROBE: request 492 self.debug("[fd %5d] (ts %f) (request-id %r) identifier %s", 493 self.transport.fileno(), time.time(), self.requestId, 494 identifier) 495 496 # Ok, we have an identifier. Is it one we know about, or do we have 497 # a default destination? 498 destinationAvatar = self._porter.findDestination(identifier) 499 500 if not destinationAvatar or not destinationAvatar.isAttached(): 501 if destinationAvatar: 502 self.debug("There was an avatar, but it logged out?") 503 504 # PROBE: no destination; see send fd 505 self.debug( 506 "[fd %5d] (ts %f) (request-id %r) no destination avatar found", 507 self.transport.fileno(), time.time(), self.requestId) 508 509 self.writeNotFoundResponse() 510 return self.transport.loseConnection() 511 512 # Transfer control over this FD. Pass all the data so-far received 513 # along in the same message. The receiver will push that data into 514 # the Twisted Protocol object as if it had been normally received, 515 # so it looks to the receiver like it has read the entire data stream 516 # itself. 517 518 # PROBE: send fd; see no destination and fdserver.py 519 self.debug("[fd %5d] (ts %f) (request-id %r) send fd to avatarId %s", 520 self.transport.fileno(), time.time(), self.requestId, 521 destinationAvatar.avatarId) 522 523 # TODO: Check out blocking characteristics of sendFileDescriptor, fix 524 # if it blocks. 525 try: 526 destinationAvatar.mind.broker.transport.sendFileDescriptor( 527 self.transport.fileno(), self._buffer) 528 except OSError, e: 529 self.warning("[fd %5d] failed to send FD: %s", 530 self.transport.fileno(), log.getExceptionMessage(e)) 531 self.writeServiceUnavailableResponse() 532 return self.transport.loseConnection() 533 534 # PROBE: sent fd; see no destination and fdserver.py 535 self.debug("[fd %5d] (ts %f) (request-id %r) sent fd to avatarId %s", 536 self.transport.fileno(), time.time(), self.requestId, 537 destinationAvatar.avatarId) 538 539 # After this, we don't want to do anything with the FD, other than 540 # close our reference to it - but not close the actual TCP connection. 541 # We set keepSocketAlive to make loseConnection() only call close() 542 # rather than shutdown() then close() 543 self.transport.keepSocketAlive = True 544 self.transport.loseConnection()
545
546 - def parseLine(self, line):
547 """ 548 Parse the initial line of the request. Return an object that can be 549 used to uniquely identify the stream being requested by passing it to 550 extractIdentifier, or None if the request is unreadable. 551 552 Subclasses should override this. 553 """ 554 raise NotImplementedError
555
556 - def unparseLine(self, parsed):
557 """ 558 Recreate the initial request line from the parsed representation. The 559 recreated line does not need to be exactly identical, but both 560 parsedLine(unparseLine(line)) and line should contain the same 561 information (i.e. unparseLine should not lose information). 562 563 UnparseLine has to return a valid line from the porter protocol's 564 scheme point of view (for instance, HTTP). 565 566 Subclasses should override this. 567 """ 568 raise NotImplementedError
569
570 - def extractIdentifier(self, parsed):
571 """ 572 Extract a string that uniquely identifies the requested stream from the 573 parsed representation of the first request line. 574 575 Subclasses should override this, depending on how they implemented 576 parseLine. 577 """ 578 raise NotImplementedError
579
580 - def generateRequestId(self):
581 """ 582 Return a string that will uniquely identify the request. 583 584 Subclasses should override this if they want to use request-ids and 585 also implement injectRequestId. 586 """ 587 raise NotImplementedError
588
589 - def injectRequestId(self, parsed, requestId):
590 """ 591 Take the parsed representation of the first request line and a string 592 token, return a parsed representation of the request line with the 593 request-id possibly mixed into it. 594 595 Subclasses should override this if they generate request-ids. 596 """ 597 # by default, ignore the request-id 598 return parsed
599
600 - def writeNotFoundResponse(self):
601 """ 602 Write a response indicating that the requested resource was not found 603 in this protocol. 604 605 Subclasses should override this to use the correct protocol. 606 """ 607 raise NotImplementedError
608
610 """ 611 Write a response indicating that the requested resource was 612 temporarily uavailable in this protocol. 613 614 Subclasses should override this to use the correct protocol. 615 """ 616 raise NotImplementedError
617 618
619 -class HTTPPorterProtocol(PorterProtocol):
620 scheme = 'http' 621 protos = ["HTTP/1.0", "HTTP/1.1"] 622 requestIdParameter = 'FLUREQID' 623 requestIdBitsNo = 256 624
625 - def parseLine(self, line):
626 try: 627 (method, location, proto) = map(string.strip, line.split(' ', 2)) 628 629 if proto not in self.protos: 630 return None 631 632 # Currently, we just use the URL parsing code from urllib2 633 parsed_url = urlparse.urlparse(location) 634 635 return method, parsed_url, proto 636 637 except ValueError: 638 return None
639
640 - def unparseLine(self, parsed):
641 method, parsed_url, proto = parsed 642 return ' '.join((method, urlparse.urlunparse(parsed_url), proto))
643
644 - def generateRequestId(self):
645 # Remember to return something that does not need quoting to be put in 646 # a GET parameter. This way we spare ourselves the effort of quoting in 647 # injectRequestId. 648 return hex(random.getrandbits(self.requestIdBitsNo))[2:]
649
650 - def injectRequestId(self, parsed, requestId):
651 method, parsed_url, proto = parsed 652 # assuming no need to escape the requestId, see generateRequestId 653 sep = '' 654 if parsed_url[4] != '': 655 sep = '&' 656 query_string = ''.join((parsed_url[4], 657 sep, self.requestIdParameter, '=', 658 requestId)) 659 parsed_url = (parsed_url[:4] + 660 (query_string, ) 661 + parsed_url[5:]) 662 return method, parsed_url, proto
663
664 - def extractIdentifier(self, parsed):
665 method, parsed_url, proto = parsed 666 # Currently, we just return the path part of the URL. 667 return parsed_url[2]
668
669 - def writeNotFoundResponse(self):
670 self.transport.write("HTTP/1.0 404 Not Found\r\n\r\nResource unknown")
671
673 self.transport.write("HTTP/1.0 503 Service Unavailable\r\n\r\n" 674 "Service temporarily unavailable")
675 676
677 -class RTSPPorterProtocol(HTTPPorterProtocol):
678 scheme = 'rtsp' 679 protos = ["RTSP/1.0"] 680
681 - def writeNotFoundResponse(self):
682 self.transport.write("RTSP/1.0 404 Not Found\r\n\r\nResource unknown")
683
685 self.transport.write("RTSP/1.0 503 Service Unavailable\r\n\r\n" 686 "Service temporarily unavailable")
687