1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import os
23 import socket
24 import time
25 import errno
26 import string
27 import resource
28 import fcntl
29
30 import gst
31
32 try:
33 from twisted.web import http
34 except ImportError:
35 from twisted.protocols import http
36
37 from twisted.web import server, resource as web_resource
38 from twisted.internet import reactor, defer
39 from twisted.python import reflect
40
41 from flumotion.configure import configure
42 from flumotion.common import errors
43
44 from flumotion.common import common, log, keycards
45
46 from flumotion.component.base import http as httpbase
47
48 __all__ = ['HTTPStreamingResource', 'MultifdSinkStreamer']
49
50 HTTP_NAME = 'FlumotionHTTPServer'
51 HTTP_VERSION = configure.version
52
53 ERROR_TEMPLATE = """<!doctype html public "-//IETF//DTD HTML 2.0//EN">
54 <html>
55 <head>
56 <title>%(code)d %(error)s</title>
57 </head>
58 <body>
59 <h2>%(code)d %(error)s</h2>
60 </body>
61 </html>
62 """
63
64 HTTP_SERVER = '%s/%s' % (HTTP_NAME, HTTP_VERSION)
65
66
68
69 __reserve_fds__ = 50
70
71 logCategory = 'httpstreamer'
72
73
74
75
76 isLeaf = True
77
79 """
80 @param streamer: L{MultifdSinkStreamer}
81 """
82 self.streamer = streamer
83 self.httpauth = httpauth
84
85 self._requests = {}
86
87 self.maxclients = self.getMaxAllowedClients(-1)
88 self.maxbandwidth = -1
89
90
91 self._redirectOnFull = None
92
93 self._removing = {}
94
95 self.loggers = \
96 streamer.plugs['flumotion.component.plugs.loggers.Logger']
97
98 self.logfilter = None
99
100 web_resource.Resource.__init__(self)
101
103
104
105 if fd in self._requests:
106 request = self._requests[fd]
107 self._removeClient(request, fd, stats)
108 else:
109 self.warning('[fd %5d] not found in _requests' % fd)
110
112 """
113 Start to remove all the clients connected (this will complete
114 asynchronously from another thread)
115
116 Returns a deferred that will fire once they're all removed.
117 """
118 l = []
119 for fd in self._requests:
120 self._removing[fd] = defer.Deferred()
121 l.append(self._removing[fd])
122 self.streamer.remove_client(fd)
123
124 return defer.DeferredList(l)
125
127 self.putChild(path, self)
128
130 self.logfilter = logfilter
131
133 """
134 Close the logfile, then reopen using the previous logfilename
135 """
136 for logger in self.loggers:
137 self.debug('rotating logger %r' % logger)
138 logger.rotate()
139
140 - def logWrite(self, fd, ip, request, stats):
141
142 headers = request.getAllHeaders()
143
144 if stats:
145 bytes_sent = stats[0]
146 time_connected = int(stats[3] / gst.SECOND)
147 else:
148 bytes_sent = -1
149 time_connected = -1
150
151 args = {'ip': ip,
152 'time': time.gmtime(),
153 'method': request.method,
154 'uri': request.uri,
155 'username': '-',
156 'get-parameters': request.args,
157 'clientproto': request.clientproto,
158 'response': request.code,
159 'bytes-sent': bytes_sent,
160 'referer': headers.get('referer', None),
161 'user-agent': headers.get('user-agent', None),
162 'time-connected': time_connected}
163
164 l = []
165 for logger in self.loggers:
166 l.append(defer.maybeDeferred(
167 logger.event, 'http_session_completed', args))
168
169 return defer.DeferredList(l)
170
172 self.info('setting maxclients to %d' % limit)
173 self.maxclients = self.getMaxAllowedClients(limit)
174
175 self.info('set maxclients to %d' % self.maxclients)
176
178 self.maxbandwidth = limit
179 self.info("set maxbandwidth to %d", self.maxbandwidth)
180
182 self._redirectOnFull = url
183
184
186 """
187 Write out the HTTP headers for the incoming HTTP request.
188
189 @rtype: boolean
190 @returns: whether or not the file descriptor can be used further.
191 """
192 fd = request.transport.fileno()
193 fdi = request.fdIncoming
194
195
196 if fd == -1:
197 self.info('[fd %5d] Client gone before writing header' % fdi)
198
199 return False
200 if fd != request.fdIncoming:
201 self.warning('[fd %5d] does not match current fd %d' % (fdi, fd))
202
203 return False
204
205 headers = []
206
207 def setHeader(field, name):
208 headers.append('%s: %s\r\n' % (field, name))
209
210
211 content = self.streamer.get_content_type()
212 setHeader('Server', HTTP_SERVER)
213 setHeader('Date', http.datetimeToString())
214 setHeader('Cache-Control', 'no-cache')
215 setHeader('Cache-Control', 'private')
216 setHeader('Content-type', content)
217
218
219
220
221
222
223
224
225
226
227
228
229
230 try:
231
232
233
234
235 os.write(fd, 'HTTP/1.0 200 OK\r\n%s\r\n' % ''.join(headers))
236
237 request.startedWriting = True
238 return True
239 except OSError, (no, s):
240 if no == errno.EBADF:
241 self.info('[fd %5d] client gone before writing header' % fd)
242 elif no == errno.ECONNRESET:
243 self.info('[fd %5d] client reset connection writing header' % fd)
244 else:
245 self.info('[fd %5d] unhandled write error when writing header: %s' % (fd, s))
246
247 del request
248 return False
249
251 if self.streamer.caps == None:
252 self.debug('We have no caps yet')
253 return False
254
255 return True
256
258 """
259 maximum number of allowed clients based on soft limit for number of
260 open file descriptors and fd reservation. Increases soft limit to
261 hard limit if possible.
262 """
263 (softmax, hardmax) = resource.getrlimit(resource.RLIMIT_NOFILE)
264 import sys
265 version = sys.version_info
266
267 if maxclients != -1:
268 neededfds = maxclients + self.__reserve_fds__
269
270
271 if version[:3] == (2,4,3) and not hasattr(socket,"has_2_4_3_patch"):
272 hardmax = 1024
273
274 if neededfds > softmax:
275 lim = min(neededfds, hardmax)
276 resource.setrlimit(resource.RLIMIT_NOFILE, (lim, hardmax))
277 return lim - self.__reserve_fds__
278 else:
279 return maxclients
280 else:
281 return softmax - self.__reserve_fds__
282
284 if self.maxclients >= 0 and len(self._requests) >= self.maxclients:
285 return True
286 elif self.maxbandwidth >= 0:
287
288 if ((len(self._requests) + 1) *
289 self.streamer.getCurrentBitrate() >= self.maxbandwidth):
290 return True
291 return False
292
294 """
295 Add a request, so it can be used for statistics.
296
297 @param request: the request
298 @type request: twisted.protocol.http.Request
299 """
300
301 fd = request.transport.fileno()
302 self._requests[fd] = request
303
305 """
306 Returns whether we want to log a request from this IP; allows us to
307 filter requests from automated monitoring systems.
308 """
309 if self.logfilter:
310 return not self.logfilter.isInRange(ip)
311 else:
312 return True
313
315 """
316 Removes a request and add logging.
317 Note that it does not disconnect the client; it is called in reaction
318 to a client disconnecting.
319 It also removes the keycard if one was created.
320
321 @param request: the request
322 @type request: L{twisted.protocols.http.Request}
323 @param fd: the file descriptor for the client being removed
324 @type fd: L{int}
325 @param stats: the statistics for the removed client
326 @type stats: GValueArray
327 """
328
329 ip = request.getClientIP()
330 if self._logRequestFromIP(ip):
331 d = self.logWrite(fd, ip, request, stats)
332 else:
333 d = defer.succeed(True)
334 self.info('[fd %5d] Client from %s disconnected' % (fd, ip))
335
336
337
338
339 self.httpauth.cleanupAuth(fd)
340
341 self.debug('[fd %5d] closing transport %r' % (fd, request.transport))
342
343
344
345 del self._requests[fd]
346 request.transport.loseConnection()
347
348 self.debug('[fd %5d] closed transport %r' % (fd, request.transport))
349
350 def _done(_):
351 if fd in self._removing:
352 self.debug("client is removed; firing deferred")
353 removeD = self._removing.pop(fd)
354 removeD.callback(None)
355 d.addCallback(_done)
356 return d
357
369
370
371
372
398
400 self.debug('Not sending data, it\'s not ready')
401 return server.NOT_DONE_YET
402
421
423
424 fdi = request.fdIncoming
425 if not self._writeHeaders(request):
426 self.debug("[fd %5d] not adding as a client" % fdi)
427 return
428 self._addClient(request)
429
430
431
432
433
434
435
436
437 fd = fdi
438 self.debug("taking away [fd %5d] from Twisted" % fd)
439 reactor.removeReader(request.transport)
440
441
442
443
444 try:
445 fcntl.fcntl(fd, fcntl.F_GETFL)
446 except IOError, e:
447 if e.errno == errno.EBADF:
448 self.warning("[fd %5d] is not actually open, ignoring" % fd)
449 else:
450 self.warning("[fd %5d] error during check: %s (%d)" % (
451 fd, e.strerror, e.errno))
452 return
453
454
455 self.streamer.add_client(fd)
456 ip = request.getClientIP()
457
458 self.info('[fd %5d] Started streaming to %s' % (fd, ip))
459
460 render_GET = _render
461 render_HEAD = _render
462
463 -class HTTPRoot(web_resource.Resource, log.Loggable):
477