Package flumotion :: Package component :: Package misc :: Package httpserver :: Package httpcached :: Module http_client
[hide private]

Source Code for Module flumotion.component.misc.httpserver.httpcached.http_client

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_component_providers -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import datetime 
 23  import cgi 
 24   
 25  from twisted.internet import defer, protocol, reactor 
 26  from twisted.python.util import InsensitiveDict 
 27  from twisted.web import http 
 28   
 29  from flumotion.common import log 
 30  from flumotion.common import errors 
 31  from flumotion.component.misc.httpserver.httpcached import common 
 32  from flumotion.component.misc.httpserver.httpcached import http_utils 
 33   
 34   
 35  LOG_CATEGORY = "stream-provider" 
 36   
 37  USER_AGENT = "FlumotionClient/0.1" 
 38   
 39   
40 -def ts2str(ts):
41 if ts: 42 return datetime.datetime.fromtimestamp(ts).isoformat() 43 return "???"
44 45
46 -class StreamInfo(object):
47 """ 48 Provides information about a stream in a standard way. 49 The information is retrieved by parsing HTTP headers. 50 """ 51
52 - def __init__(self, headers):
53 self.expires = None 54 self.mtime = None 55 self.length = 0 56 self.start = 0 57 self.size = 0 58 self.mimeType = None 59 60 headers = InsensitiveDict(headers) 61 62 encoding = headers.get("Transfer-Encoding", None) 63 if encoding == 'chunked': 64 raise errors.FlumotionError("Chunked transfer not supported") 65 66 expires = headers.get("Expires", None) 67 if expires is not None: 68 try: 69 self.expires = http.stringToDatetime(expires) 70 except: 71 self.expires = 0 72 73 lastmod = headers.get("Last-Modified", None) 74 if lastmod is not None: 75 self.mtime = http.stringToDatetime(lastmod) 76 77 range = headers.get("Content-Range", None) 78 length = headers.get("Content-Length", None) 79 if range is not None: 80 start, end, total = http.parseContentRange(range) 81 self.start = start 82 self.length = total 83 if length is not None: 84 self.size = int(length) 85 else: 86 self.size = end - start 87 elif length is not None: 88 self.length = int(length) 89 self.size = int(length) 90 else: 91 raise errors.FlumotionError("Can't get length/size from headers", 92 headers) 93 94 ctype = headers.get("Content-Type", None) 95 if ctype is not None: 96 self.mimeType, _pdict = cgi.parse_header(ctype)
97 98
99 -class StreamRequester(log.Loggable):
100 """ 101 Allows retrieval of data streams using HTTP 1.0. 102 """ 103 104 logCategory = LOG_CATEGORY 105
106 - def __init__(self, connTimeout=0, idleTimeout=0):
107 self.connTimeout = connTimeout 108 self.idleTimeout = idleTimeout
109
110 - def retrieve(self, consumer, url, proxyAddress=None, proxyPort=None, 111 ifModifiedSince=None, ifUnmodifiedSince=None, 112 start=None, size=None):
113 self.log("Requesting %s%s%s%s%s%s", 114 size and (" %d bytes" % size) or "", 115 start and (" starting at %d" % start) or "", 116 (size or start) and " from " or "", 117 url.toString(), 118 ifModifiedSince and (" if modified since %s" 119 % ts2str(ifModifiedSince)) or "", 120 ifUnmodifiedSince and (" if not modified since %s" 121 % ts2str(ifUnmodifiedSince)) or "") 122 123 getter = StreamGetter(consumer, url, 124 ifModifiedSince, ifUnmodifiedSince, 125 start, size, self.idleTimeout) 126 getter.connect(proxyAddress, proxyPort, self.connTimeout) 127 return getter
128 129
130 -class StreamGetter(protocol.ClientFactory, http.HTTPClient, log.Loggable):
131 """ 132 Retrieves a stream using HTTP 1.0. 133 134 This class is at the same time a Factory and a Protocol, 135 this can be done because it's a client and in twisted 136 client factories only create on protocol. 137 138 The outcome, the stream info and stream data is forwarded 139 to a common.StreamConsumer instance given at creating time. 140 141 It supports range requests and some conditional request types 142 (ifModified and ifUnmodified). 143 """ 144 145 logCategory = LOG_CATEGORY 146 147 HTTP_METHOD = 'GET' 148 149 host = None 150 port = None 151
152 - def __init__(self, consumer, url, 153 ifModifiedSince=None, ifUnmodifiedSince=None, 154 start=None, size=None, timeout=0):
155 self.consumer = consumer 156 self.url = url 157 158 self.ifModifiedSince = ifModifiedSince 159 self.ifUnmodifiedSince = ifUnmodifiedSince 160 161 self.start = start 162 self.size = size 163 self.timeout = timeout 164 165 self.headers = {} 166 self.peer = None 167 self.status = None 168 self.info = None 169 170 self._connected = False 171 self._canceled = False 172 self._remaining = None 173 self._idlecheck = None 174 175 self.logName = common.log_id(self) # To be able to track the instance
176
177 - def __repr__(self):
178 return "<%s: %s>" % (type(self).__name__, self.url)
179 180 ### Public Methods ### 181
182 - def connect(self, proxyAddress=None, proxyPort=None, timeout=0):
183 assert not self._connected, "Already connected" 184 self._connected = True 185 url = self.url 186 self.host = proxyAddress or url.hostname 187 self.port = proxyPort or url.port 188 if url.scheme != 'http': 189 msg = "URL scheme %s not implemented" % url.scheme 190 self._serverError(common.NOT_IMPLEMENTED, msg) 191 else: 192 self.log("Connecting to %s:%s for %s", 193 self.host, self.port, self.url) 194 reactor.connectTCP(self.host, self.port, self, timeout)
195
196 - def pause(self):
197 if not self.paused and self.transport is not None: 198 self.pauseProducing() 199 self.log("Request paused for %s", self.url)
200
201 - def resume(self):
202 if self.paused and self.transport is not None: 203 self.resumeProducing() 204 self.log("Request resumed for %s", self.url)
205
206 - def cancel(self):
207 if self._connected and self.transport is not None: 208 self.transport.loseConnection() 209 self._cancelIdleCheck() 210 self.log("Request canceled for %s", self.url) 211 self._canceled = True
212 213 ### Overridden Methods ### 214
215 - def buildProtocol(self, addr):
216 assert self.peer is None, "Protocol already built" 217 self.peer = addr 218 return self
219
220 - def clientConnectionFailed(self, connector, reason):
221 self._serverError(common.SERVER_UNAVAILABLE, reason.getErrorMessage())
222
223 - def connectionMade(self):
224 self.log("Connection made for %s", self.url) 225 self.sendCommand(self.HTTP_METHOD, self.url.location) 226 self.sendHeader('Host', self.url.host) 227 self.sendHeader('User-Agent', USER_AGENT) 228 self.sendHeader('Connection', "close") # Pipeline not yet supported 229 230 if self.ifModifiedSince: 231 datestr = http.datetimeToString(self.ifModifiedSince) 232 self.sendHeader('If-Modified-Since', datestr) 233 234 if self.ifUnmodifiedSince: 235 datestr = http.datetimeToString(self.ifUnmodifiedSince) 236 self.sendHeader('If-Unmodified-Since', datestr) 237 238 if self.start or self.size: 239 start = self.start or 0 240 end = (self.size and (start + self.size - 1)) or None 241 rangeSpecs = "bytes=%s-%s" % (start, end or "") 242 self.sendHeader('Range', rangeSpecs) 243 244 self.endHeaders() 245 246 self._resetIdleCheck()
247
248 - def connectionLost(self, reason):
249 self.log("Connection lost for %s", self.url) 250 self.handleResponseEnd() 251 if not self._canceled: 252 self._serverError(common.SERVER_DISCONNECTED, 253 reason.getErrorMessage())
254
255 - def handleStatus(self, version, status_str, message):
256 self._keepActive() 257 status = int(status_str) 258 self.status = status 259 260 if status in (http.OK, http.NO_CONTENT, http.PARTIAL_CONTENT): 261 return 262 263 if status == http.REQUESTED_RANGE_NOT_SATISFIABLE: 264 self._serverError(common.RANGE_NOT_SATISFIABLE, 265 "HTTP range not satisfiable") 266 if status == http.NOT_MODIFIED: 267 self._conditionFail(common.STREAM_NOT_MODIFIED, 268 "Stream not modified") 269 elif status == http.PRECONDITION_FAILED: 270 self._conditionFail(common.STREAM_MODIFIED, "Stream Modified") 271 elif status == http.NOT_FOUND: 272 self._streamNotAvailable(common.STREAM_NOTFOUND, 273 "Resource Not Found") 274 elif status == http.FORBIDDEN: 275 self._streamNotAvailable(common.STREAM_FORBIDDEN, 276 "Resource Forbidden") 277 if status in (http.MOVED_PERMANENTLY, http.FOUND): 278 self._serverError(common.NOT_IMPLEMENTED, 279 "HTTP redirection not supported") 280 else: 281 self._serverError(common.NOT_IMPLEMENTED, 282 "Unsupported HTTP response: %s (%s)" 283 % (message, status))
284
285 - def handleHeader(self, key, val):
286 self._keepActive() 287 self.headers[key] = val
288
289 - def handleEndHeaders(self):
290 self._keepActive() 291 self.info = StreamInfo(self.headers) 292 if self.size and self.size < self.info.size: 293 self.warning("Response size bigger than the requested size, " 294 "expecting %s bytes and response length is %s", 295 self.size, self.info.size) 296 self._remaining = self.info.size 297 self._onInfo(self.info)
298
299 - def handleResponsePart(self, data):
300 self._keepActive() 301 size = len(data) 302 if self._remaining > 0 and self._remaining < size: 303 self.warning("More than %s bytes have been received", 304 self.info.size) 305 self._remaining -= size 306 self._onData(data)
307
308 - def handleResponseEnd(self):
309 if self.info is not None: 310 if self._remaining == 0: 311 self.log("Request done, got %d bytes starting at %d from %s, " 312 "last modified on %s", self.info.size, 313 self.info.start, self.url.toString(), 314 ts2str(self.info.mtime)) 315 self._streamDone() 316 return 317 if self.info: 318 self.log("Incomplete request, missing %d bytes from the expected " 319 "%d bytes starting at %d from %s", self._remaining, 320 self.info.size, self.info.start, self.url.toString()) 321 else: 322 self.log("Incomplete request %s", self.url.toString())
323
324 - def sendCommand(self, command, path):
325 # We want HTTP/1.1 for conditional GET and range requests 326 self.transport.write('%s %s HTTP/1.1\r\n' % (command, path))
327 328 ### Private Methods ### 329
330 - def _keepActive(self):
331 self._updateCount += 1
332
333 - def _resetIdleCheck(self):
334 self._cancelIdleCheck() 335 self._idlecheck = reactor.callLater(self.timeout, self._onIdleCheck)
336
337 - def _cancelIdleCheck(self):
338 if self._idlecheck: 339 self._idlecheck.cancel() 340 self._idlecheck = None 341 self._updateCount = 0
342
343 - def _onIdleCheck(self):
344 self._idlecheck = None 345 if not self._updateCount: 346 self._onTimeout() 347 else: 348 self._resetIdleCheck()
349
350 - def _onTimeout(self):
351 self._idlecheck = None 352 self._serverError(common.SERVER_TIMEOUT, "Server timeout")
353
354 - def _cancel(self):
355 self._cancelIdleCheck() 356 if self.consumer: 357 if self.transport: 358 self.transport.loseConnection() 359 self.consumer = None
360
361 - def _serverError(self, code, message):
362 if self.consumer: 363 self.consumer.serverError(self, code, message) 364 self._cancel()
365
366 - def _conditionFail(self, code, message):
367 if self.consumer: 368 self.consumer.conditionFail(self, code, message) 369 self._cancel()
370
371 - def _streamNotAvailable(self, code, message):
372 if self.consumer: 373 self.consumer.streamNotAvailable(self, code, message) 374 self._cancel()
375
376 - def _onInfo(self, info):
377 if self.consumer: 378 self.consumer.onInfo(self, info)
379
380 - def _onData(self, data):
381 if self.consumer: 382 self.consumer.onData(self, data)
383
384 - def _streamDone(self):
385 if self.consumer: 386 self.consumer.streamDone(self) 387 self._cancel()
388 389 390 if __name__ == "__main__": 391 import sys 392
393 - def addarg(d, a):
394 k, v = a.split('=', 1) 395 if v == 'None': 396 d[k] = None 397 try: 398 d[k] = int(v) 399 except: 400 d[k] = v
401 402 403 kwargs = {} 404 for a in sys.argv[1:]: 405 addarg(kwargs, a) 406 407 url = kwargs.pop('url') 408
409 - class DummyConsumer(object):
410
411 - def serverError(self, getter, code, message):
412 print "Failure: %s (%d)" % (message, code) 413 reactor.stop()
414
415 - def conditionFail(self, getter, code, message):
416 print "Condition: %s (%d)" % (message, code) 417 reactor.stop()
418
419 - def streamNotAvailable(self, getter, code, message):
420 print message 421 reactor.stop()
422
423 - def streamDone(self, getter):
424 print "Finished" 425 reactor.stop()
426
427 - def onInfo(self, getter, info):
428 exp = info.expires and http.datetimeToString(info.expires) 429 mod = info.mtime and http.datetimeToString(info.mtime) 430 print "Found, Exp:", exp, "Mod:", mod 431 print "Len:", info.length, "Start:", \ 432 info.start, "Size:", info.size
433
434 - def onData(self, getter, data):
435 #print "Data (%d)" % len(data) 436 pass
437 438 439 consumer = DummyConsumer() 440 requester = StreamRequester(5000, 5000) 441 requester.retrieve(consumer, http_utils.Url.fromString(url), **kwargs) 442 reactor.run() 443