1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import os
23 import socket
24 import time
25 import errno
26 import string
27 import resource
28 import fcntl
29
30 import gst
31
32 try:
33 from twisted.web import http
34 except ImportError:
35 from twisted.protocols import http
36
37 from twisted.web import server, resource as web_resource
38 from twisted.internet import reactor, defer
39 from twisted.python import reflect
40
41 from flumotion.configure import configure
42 from flumotion.common import errors
43
44 from flumotion.common import common, log, keycards
45
46 from flumotion.component.base import http as httpbase
47
48 __all__ = ['HTTPStreamingResource', 'MultifdSinkStreamer']
49 __version__ = "$Rev$"
50
51 HTTP_NAME = 'FlumotionHTTPServer'
52 HTTP_VERSION = configure.version
53
54 ERROR_TEMPLATE = """<!doctype html public "-//IETF//DTD HTML 2.0//EN">
55 <html>
56 <head>
57 <title>%(code)d %(error)s</title>
58 </head>
59 <body>
60 <h2>%(code)d %(error)s</h2>
61 </body>
62 </html>
63 """
64
65 HTTP_SERVER = '%s/%s' % (HTTP_NAME, HTTP_VERSION)
66
67
68
69
71
72 __reserve_fds__ = 50
73
74 logCategory = 'httpstreamer'
75
76
77
78
79 isLeaf = True
80
82 """
83 @param streamer: L{MultifdSinkStreamer}
84 """
85 self.streamer = streamer
86 self.httpauth = httpauth
87
88 self._requests = {}
89
90 self.maxclients = self.getMaxAllowedClients(-1)
91 self.maxbandwidth = -1
92
93
94 self._redirectOnFull = None
95
96 self._removing = {}
97
98 socket = 'flumotion.component.plugs.request.RequestLoggerPlug'
99 self.loggers = streamer.plugs.get(socket, [])
100
101 socket = \
102 'flumotion.component.plugs.requestmodifier.RequestModifierPlug'
103 self.modifiers = streamer.plugs.get(socket, [])
104
105 self.logfilter = None
106
107 web_resource.Resource.__init__(self)
108
117
119 """
120 Start to remove all the clients connected (this will complete
121 asynchronously from another thread)
122
123 Returns a deferred that will fire once they're all removed.
124 """
125 l = []
126 for fd in self._requests:
127 self._removing[fd] = defer.Deferred()
128 l.append(self._removing[fd])
129 self.streamer.remove_client(fd)
130
131 return defer.DeferredList(l)
132
134 self.putChild(path, self)
135
137 self.logfilter = logfilter
138
140 """
141 Close the logfile, then reopen using the previous logfilename
142 """
143 for logger in self.loggers:
144 self.debug('rotating logger %r' % logger)
145 logger.rotate()
146
147 - def logWrite(self, fd, ip, request, stats):
148
149 headers = request.getAllHeaders()
150
151 if stats:
152 bytes_sent = stats[0]
153 time_connected = int(stats[3] / gst.SECOND)
154 else:
155 bytes_sent = -1
156 time_connected = -1
157
158 args = {'ip': ip,
159 'time': time.gmtime(),
160 'method': request.method,
161 'uri': request.uri,
162 'username': '-',
163 'get-parameters': request.args,
164 'clientproto': request.clientproto,
165 'response': request.code,
166 'bytes-sent': bytes_sent,
167 'referer': headers.get('referer', None),
168 'user-agent': headers.get('user-agent', None),
169 'time-connected': time_connected}
170
171 l = []
172 for logger in self.loggers:
173 l.append(defer.maybeDeferred(
174 logger.event, 'http_session_completed', args))
175
176 return defer.DeferredList(l)
177
179 self.info('setting maxclients to %d' % limit)
180 self.maxclients = self.getMaxAllowedClients(limit)
181
182 self.info('set maxclients to %d' % self.maxclients)
183
185 self.maxbandwidth = limit
186 self.info("set maxbandwidth to %d", self.maxbandwidth)
187
189 self._redirectOnFull = url
190
191
192
194 """
195 Write out the HTTP headers for the incoming HTTP request.
196
197 @rtype: boolean
198 @returns: whether or not the file descriptor can be used further.
199 """
200 fd = request.transport.fileno()
201 fdi = request.fdIncoming
202
203
204 if fd == -1:
205 self.info('[fd %5d] Client gone before writing header' % fdi)
206
207 return False
208 if fd != request.fdIncoming:
209 self.warning('[fd %5d] does not match current fd %d' % (fdi, fd))
210
211 return False
212
213 content = self.streamer.get_content_type()
214 request.setHeader('Server', HTTP_SERVER)
215 request.setHeader('Date', http.datetimeToString())
216 request.setHeader('Connection', 'close')
217 request.setHeader('Cache-Control', 'no-cache')
218 request.setHeader('Cache-Control', 'private')
219 request.setHeader('Content-type', content)
220
221
222 for modifier in self.modifiers:
223 modifier.modify(request)
224
225
226 headers = []
227 for name, value in request.headers.items():
228 headers.append('%s: %s\r\n' % (name.capitalize(), value))
229 for cookie in request.cookies:
230 headers.append('%s: %s\r\n' % ("Set-Cookie", cookie))
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245 try:
246
247
248
249
250 os.write(fd, 'HTTP/1.0 200 OK\r\n%s\r\n' % ''.join(headers))
251
252 request.startedWriting = True
253 return True
254 except OSError, (no, s):
255 if no == errno.EBADF:
256 self.info('[fd %5d] client gone before writing header' % fd)
257 elif no == errno.ECONNRESET:
258 self.info(
259 '[fd %5d] client reset connection writing header' % fd)
260 else:
261 self.info(
262 '[fd %5d] unhandled write error when writing header: %s'
263 % (fd, s))
264
265 del request
266 return False
267
269 if self.streamer.caps == None:
270 self.debug('We have no caps yet')
271 return False
272
273 return True
274
276 """
277 maximum number of allowed clients based on soft limit for number of
278 open file descriptors and fd reservation. Increases soft limit to
279 hard limit if possible.
280 """
281 (softmax, hardmax) = resource.getrlimit(resource.RLIMIT_NOFILE)
282 import sys
283 version = sys.version_info
284
285 if maxclients != -1:
286 neededfds = maxclients + self.__reserve_fds__
287
288
289
290
291 if version[:3] == (2, 4, 3) and \
292 not hasattr(socket, "has_2_4_3_patch"):
293 self.warning(
294 'Setting hardmax to 1024 due to python 2.4.3 bug')
295 hardmax = 1024
296
297 if neededfds > softmax:
298 lim = min(neededfds, hardmax)
299 resource.setrlimit(resource.RLIMIT_NOFILE, (lim, hardmax))
300 return lim - self.__reserve_fds__
301 else:
302 return maxclients
303 else:
304 return softmax - self.__reserve_fds__
305
307 if self.maxclients >= 0 and len(self._requests) >= self.maxclients:
308 return True
309 elif self.maxbandwidth >= 0:
310
311 if ((len(self._requests) + 1) *
312 self.streamer.getCurrentBitrate() >= self.maxbandwidth):
313 return True
314 return False
315
317 """
318 Add a request, so it can be used for statistics.
319
320 @param request: the request
321 @type request: twisted.protocol.http.Request
322 """
323
324 fd = request.transport.fileno()
325 self._requests[fd] = request
326
328 """
329 Returns whether we want to log a request from this IP; allows us to
330 filter requests from automated monitoring systems.
331 """
332 if self.logfilter:
333 return not self.logfilter.isInRange(ip)
334 else:
335 return True
336
338 """
339 Removes a request and add logging.
340 Note that it does not disconnect the client; it is called in reaction
341 to a client disconnecting.
342 It also removes the keycard if one was created.
343
344 @param request: the request
345 @type request: L{twisted.protocols.http.Request}
346 @param fd: the file descriptor for the client being removed
347 @type fd: L{int}
348 @param stats: the statistics for the removed client
349 @type stats: GValueArray
350 """
351
352
353 self.debug('[fd %5d] (ts %f) finishing request %r',
354 request.transport.fileno(), time.time(), request)
355
356 ip = request.getClientIP()
357 if self._logRequestFromIP(ip):
358 d = self.logWrite(fd, ip, request, stats)
359 else:
360 d = defer.succeed(True)
361 self.info('[fd %5d] Client from %s disconnected' % (fd, ip))
362
363
364
365
366 self.httpauth.cleanupAuth(fd)
367
368 self.debug('[fd %5d] (ts %f) closing transport %r', fd, time.time(),
369 request.transport)
370
371
372
373 del self._requests[fd]
374 request.transport.loseConnection()
375
376 self.debug('[fd %5d] closed transport %r' % (fd, request.transport))
377
378 def _done(_):
379 if fd in self._removing:
380 self.debug("client is removed; firing deferred")
381 removeD = self._removing.pop(fd)
382 removeD.callback(None)
383
384
385 self.debug('[fd %5d] (ts %f) finished request %r',
386 fd, time.time(), request)
387
388 d.addCallback(_done)
389 return d
390
407
408
409
410
411
441
443 self.debug('Not sending data, it\'s not ready')
444 return server.NOT_DONE_YET
445
464
466
467 fdi = request.fdIncoming
468 if not self._writeHeaders(request):
469 self.debug("[fd %5d] not adding as a client" % fdi)
470 return
471
472
473
474
475
476
477
478
479
480
481
482 fd = fdi
483 self.debug("[fd %5d] taking away from Twisted" % fd)
484 reactor.removeReader(request.transport)
485
486
487
488
489 try:
490 fcntl.fcntl(fd, fcntl.F_GETFL)
491 except IOError, e:
492 if e.errno == errno.EBADF:
493 self.warning("[fd %5d] is not actually open, ignoring" % fd)
494 else:
495 self.warning("[fd %5d] error during check: %s (%d)" % (
496 fd, e.strerror, e.errno))
497 return
498
499 self._addClient(request)
500
501
502 self.streamer.add_client(fd)
503 ip = request.getClientIP()
504
505
506 self.debug('[fd %5d] (ts %f) started request %r',
507 fd, time.time(), request)
508
509 self.info('[fd %5d] Started streaming to %s' % (fd, ip))
510
511 render_GET = _render
512 render_HEAD = _render
513
514
515 -class HTTPRoot(web_resource.Resource, log.Loggable):
530