Package flumotion :: Package component :: Package consumers :: Package httpstreamer :: Module resources
[hide private]

Source Code for Module flumotion.component.consumers.httpstreamer.resources

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_http -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import os 
 23  import socket 
 24  import time 
 25  import errno 
 26  import string 
 27  import resource 
 28  import fcntl 
 29   
 30  import gst 
 31   
 32  try: 
 33      from twisted.web import http 
 34  except ImportError: 
 35      from twisted.protocols import http 
 36   
 37  from twisted.web import server, resource as web_resource 
 38  from twisted.internet import reactor, defer 
 39  from twisted.python import reflect 
 40   
 41  from flumotion.configure import configure 
 42  from flumotion.common import errors 
 43   
 44  from flumotion.common import common, log, keycards 
 45   
 46  from flumotion.component.base import http as httpbase 
 47   
 48  __all__ = ['HTTPStreamingResource', 'MultifdSinkStreamer'] 
 49   
 50  HTTP_NAME = 'FlumotionHTTPServer' 
 51  HTTP_VERSION = configure.version 
 52   
 53  ERROR_TEMPLATE = """<!doctype html public "-//IETF//DTD HTML 2.0//EN"> 
 54  <html> 
 55  <head> 
 56    <title>%(code)d %(error)s</title> 
 57  </head> 
 58  <body> 
 59  <h2>%(code)d %(error)s</h2> 
 60  </body> 
 61  </html> 
 62  """ 
 63   
 64  HTTP_SERVER = '%s/%s' % (HTTP_NAME, HTTP_VERSION) 
 65   
 66  ### the Twisted resource that handles the base URL 
67 -class HTTPStreamingResource(web_resource.Resource, log.Loggable):
68 69 __reserve_fds__ = 50 # number of fd's to reserve for non-streaming 70 71 logCategory = 'httpstreamer' 72 73 # IResource interface variable; True means it will not chain requests 74 # further down the path to other resource providers through 75 # getChildWithDefault 76 isLeaf = True 77
78 - def __init__(self, streamer, httpauth):
79 """ 80 @param streamer: L{MultifdSinkStreamer} 81 """ 82 self.streamer = streamer 83 self.httpauth = httpauth 84 85 self._requests = {} # request fd -> Request 86 87 self.maxclients = self.getMaxAllowedClients(-1) 88 self.maxbandwidth = -1 # not limited by default 89 90 # If set, a URL to redirect a user to when the limits above are reached 91 self._redirectOnFull = None 92 93 self._removing = {} # Optional deferred notification of client removals. 94 95 self.loggers = \ 96 streamer.plugs['flumotion.component.plugs.loggers.Logger'] 97 98 self.logfilter = None 99 100 web_resource.Resource.__init__(self)
101
102 - def clientRemoved(self, sink, fd, reason, stats):
103 # this is the callback attached to our flumotion component, 104 # not the GStreamer element 105 if fd in self._requests: 106 request = self._requests[fd] 107 self._removeClient(request, fd, stats) 108 else: 109 self.warning('[fd %5d] not found in _requests' % fd)
110
111 - def removeAllClients(self):
112 """ 113 Start to remove all the clients connected (this will complete 114 asynchronously from another thread) 115 116 Returns a deferred that will fire once they're all removed. 117 """ 118 l = [] 119 for fd in self._requests: 120 self._removing[fd] = defer.Deferred() 121 l.append(self._removing[fd]) 122 self.streamer.remove_client(fd) 123 124 return defer.DeferredList(l)
125
126 - def setRoot(self, path):
127 self.putChild(path, self)
128
129 - def setLogFilter(self, logfilter):
130 self.logfilter = logfilter
131
132 - def rotateLogs(self):
133 """ 134 Close the logfile, then reopen using the previous logfilename 135 """ 136 for logger in self.loggers: 137 self.debug('rotating logger %r' % logger) 138 logger.rotate()
139
140 - def logWrite(self, fd, ip, request, stats):
141 142 headers = request.getAllHeaders() 143 144 if stats: 145 bytes_sent = stats[0] 146 time_connected = int(stats[3] / gst.SECOND) 147 else: 148 bytes_sent = -1 149 time_connected = -1 150 151 args = {'ip': ip, 152 'time': time.gmtime(), 153 'method': request.method, 154 'uri': request.uri, 155 'username': '-', # FIXME: put the httpauth name 156 'get-parameters': request.args, 157 'clientproto': request.clientproto, 158 'response': request.code, 159 'bytes-sent': bytes_sent, 160 'referer': headers.get('referer', None), 161 'user-agent': headers.get('user-agent', None), 162 'time-connected': time_connected} 163 164 l = [] 165 for logger in self.loggers: 166 l.append(defer.maybeDeferred( 167 logger.event, 'http_session_completed', args)) 168 169 return defer.DeferredList(l)
170
171 - def setUserLimit(self, limit):
172 self.info('setting maxclients to %d' % limit) 173 self.maxclients = self.getMaxAllowedClients(limit) 174 # Log what we actually managed to set it to. 175 self.info('set maxclients to %d' % self.maxclients)
176
177 - def setBandwidthLimit(self, limit):
178 self.maxbandwidth = limit 179 self.info("set maxbandwidth to %d", self.maxbandwidth)
180
181 - def setRedirectionOnLimits(self, url):
182 self._redirectOnFull = url
183 184 # FIXME: rename to writeHeaders
185 - def _writeHeaders(self, request):
186 """ 187 Write out the HTTP headers for the incoming HTTP request. 188 189 @rtype: boolean 190 @returns: whether or not the file descriptor can be used further. 191 """ 192 fd = request.transport.fileno() 193 fdi = request.fdIncoming 194 195 # the fd could have been closed, in which case it will be -1 196 if fd == -1: 197 self.info('[fd %5d] Client gone before writing header' % fdi) 198 # FIXME: do this ? del request 199 return False 200 if fd != request.fdIncoming: 201 self.warning('[fd %5d] does not match current fd %d' % (fdi, fd)) 202 # FIXME: do this ? del request 203 return False 204 205 headers = [] 206 207 def setHeader(field, name): 208 headers.append('%s: %s\r\n' % (field, name))
209 210 # Mimic Twisted as close as possible 211 content = self.streamer.get_content_type() 212 setHeader('Server', HTTP_SERVER) 213 setHeader('Date', http.datetimeToString()) 214 setHeader('Cache-Control', 'no-cache') 215 setHeader('Cache-Control', 'private') 216 setHeader('Content-type', content) 217 218 # ASF needs a Pragma header for live broadcasts 219 # Apparently ASF breaks on WMP port 80 if you use the pragma header 220 # - Sep 5 2006 221 #if content in [ 222 # "video/x-ms-asf", 223 # "audio/x-ms-asf", 224 #]: 225 #setHeader('Pragma', 'features=broadcast') 226 227 #self.debug('setting Content-type to %s' % mime) 228 ### FIXME: there's a window where Twisted could have removed the 229 # fd because the client disconnected. Catch EBADF correctly here. 230 try: 231 # TODO: This is a non-blocking socket, we really should check 232 # return values here, or just let twisted handle all of this 233 # normally, and not hand off the fd until after twisted has 234 # finished writing the headers. 235 os.write(fd, 'HTTP/1.0 200 OK\r\n%s\r\n' % ''.join(headers)) 236 # tell TwistedWeb we already wrote headers ourselves 237 request.startedWriting = True 238 return True 239 except OSError, (no, s): 240 if no == errno.EBADF: 241 self.info('[fd %5d] client gone before writing header' % fd) 242 elif no == errno.ECONNRESET: 243 self.info('[fd %5d] client reset connection writing header' % fd) 244 else: 245 self.info('[fd %5d] unhandled write error when writing header: %s' % (fd, s)) 246 # trigger cleanup of request 247 del request 248 return False
249
250 - def isReady(self):
251 if self.streamer.caps == None: 252 self.debug('We have no caps yet') 253 return False 254 255 return True
256
257 - def getMaxAllowedClients(self, maxclients):
258 """ 259 maximum number of allowed clients based on soft limit for number of 260 open file descriptors and fd reservation. Increases soft limit to 261 hard limit if possible. 262 """ 263 (softmax, hardmax) = resource.getrlimit(resource.RLIMIT_NOFILE) 264 import sys 265 version = sys.version_info 266 267 if maxclients != -1: 268 neededfds = maxclients + self.__reserve_fds__ 269 270 # Bug in python 2.4.3, see http://sourceforge.net/tracker/index.php?func=detail&aid=1494314&group_id=5470&atid=105470 271 if version[:3] == (2,4,3) and not hasattr(socket,"has_2_4_3_patch"): 272 hardmax = 1024 273 274 if neededfds > softmax: 275 lim = min(neededfds, hardmax) 276 resource.setrlimit(resource.RLIMIT_NOFILE, (lim, hardmax)) 277 return lim - self.__reserve_fds__ 278 else: 279 return maxclients 280 else: 281 return softmax - self.__reserve_fds__
282
283 - def reachedServerLimits(self):
284 if self.maxclients >= 0 and len(self._requests) >= self.maxclients: 285 return True 286 elif self.maxbandwidth >= 0: 287 # Reject if adding one more client would take us over the limit. 288 if ((len(self._requests) + 1) * 289 self.streamer.getCurrentBitrate() >= self.maxbandwidth): 290 return True 291 return False
292
293 - def _addClient(self, request):
294 """ 295 Add a request, so it can be used for statistics. 296 297 @param request: the request 298 @type request: twisted.protocol.http.Request 299 """ 300 301 fd = request.transport.fileno() 302 self._requests[fd] = request
303
304 - def _logRequestFromIP(self, ip):
305 """ 306 Returns whether we want to log a request from this IP; allows us to 307 filter requests from automated monitoring systems. 308 """ 309 if self.logfilter: 310 return not self.logfilter.isInRange(ip) 311 else: 312 return True
313
314 - def _removeClient(self, request, fd, stats):
315 """ 316 Removes a request and add logging. 317 Note that it does not disconnect the client; it is called in reaction 318 to a client disconnecting. 319 It also removes the keycard if one was created. 320 321 @param request: the request 322 @type request: L{twisted.protocols.http.Request} 323 @param fd: the file descriptor for the client being removed 324 @type fd: L{int} 325 @param stats: the statistics for the removed client 326 @type stats: GValueArray 327 """ 328 329 ip = request.getClientIP() 330 if self._logRequestFromIP(ip): 331 d = self.logWrite(fd, ip, request, stats) 332 else: 333 d = defer.succeed(True) 334 self.info('[fd %5d] Client from %s disconnected' % (fd, ip)) 335 336 # We can't call request.finish(), since we already "stole" the fd, we 337 # just loseConnection on the transport directly, and delete the 338 # Request object, after cleaning up the bouncer bits. 339 self.httpauth.cleanupAuth(fd) 340 341 self.debug('[fd %5d] closing transport %r' % (fd, request.transport)) 342 # This will close the underlying socket. We first remove the request 343 # from our fd->request map, since the moment we call this the fd might 344 # get re-added. 345 del self._requests[fd] 346 request.transport.loseConnection() 347 348 self.debug('[fd %5d] closed transport %r' % (fd, request.transport)) 349 350 def _done(_): 351 if fd in self._removing: 352 self.debug("client is removed; firing deferred") 353 removeD = self._removing.pop(fd) 354 removeD.callback(None)
355 d.addCallback(_done) 356 return d 357
358 - def handleAuthenticatedRequest(self, res, request):
359 if request.method == 'GET': 360 self._handleNewClient(request) 361 elif request.method == 'HEAD': 362 self.debug('handling HEAD request') 363 self._writeHeaders(request) 364 request.finish() 365 else: 366 raise AssertionError 367 368 return res
369 370 ### resource.Resource methods 371 372 # this is the callback receiving the request initially
373 - def _render(self, request):
374 fd = request.transport.fileno() 375 # we store the fd again in the request using it as an id for later 376 # on, so we can check when an fd went away (being -1) inbetween 377 request.fdIncoming = fd 378 379 self.info('[fd %5d] Incoming client connection from %s' % ( 380 fd, request.getClientIP())) 381 self.debug('[fd %5d] _render(): request %s' % ( 382 fd, request)) 383 384 if not self.isReady(): 385 return self._handleNotReady(request) 386 elif self.reachedServerLimits(): 387 return self._handleServerFull(request) 388 389 self.debug('_render(): asked for (possible) authentication') 390 d = self.httpauth.startAuthentication(request) 391 d.addCallback(self.handleAuthenticatedRequest, request) 392 # Authentication has failed and we've written a response; nothing 393 # more to do 394 d.addErrback(lambda x: None) 395 396 # we MUST return this from our _render. 397 return server.NOT_DONE_YET
398
399 - def _handleNotReady(self, request):
400 self.debug('Not sending data, it\'s not ready') 401 return server.NOT_DONE_YET
402
403 - def _handleServerFull(self, request):
404 if self._redirectOnFull: 405 self.debug("Redirecting client, client limit %d reached", 406 self.maxclients) 407 error_code = http.FOUND 408 request.setHeader('location', self._redirectOnFull) 409 else: 410 self.debug('Refusing clients, client limit %d reached' % 411 self.maxclients) 412 error_code = http.SERVICE_UNAVAILABLE 413 414 request.setHeader('content-type', 'text/html') 415 416 request.setHeader('server', HTTP_VERSION) 417 request.setResponseCode(error_code) 418 419 return ERROR_TEMPLATE % {'code': error_code, 420 'error': http.RESPONSES[error_code]}
421
422 - def _handleNewClient(self, request):
423 # everything fulfilled, serve to client 424 fdi = request.fdIncoming 425 if not self._writeHeaders(request): 426 self.debug("[fd %5d] not adding as a client" % fdi) 427 return 428 self._addClient(request) 429 430 # take over the file descriptor from Twisted by removing them from 431 # the reactor 432 # spiv told us to remove* on request.transport, and that works 433 # then we figured out that a new request is only a Reader, so we 434 # remove the removedWriter - this is because we never write to the 435 # socket through twisted, only with direct os.write() calls from 436 # _writeHeaders. 437 fd = fdi 438 self.debug("taking away [fd %5d] from Twisted" % fd) 439 reactor.removeReader(request.transport) 440 #reactor.removeWriter(request.transport) 441 442 # check if it's really an open fd (i.e. that twisted didn't close it 443 # before the removeReader() call) 444 try: 445 fcntl.fcntl(fd, fcntl.F_GETFL) 446 except IOError, e: 447 if e.errno == errno.EBADF: 448 self.warning("[fd %5d] is not actually open, ignoring" % fd) 449 else: 450 self.warning("[fd %5d] error during check: %s (%d)" % ( 451 fd, e.strerror, e.errno)) 452 return 453 454 # hand it to multifdsink 455 self.streamer.add_client(fd) 456 ip = request.getClientIP() 457 458 self.info('[fd %5d] Started streaming to %s' % (fd, ip))
459 460 render_GET = _render 461 render_HEAD = _render 462
463 -class HTTPRoot(web_resource.Resource, log.Loggable):
464 logCategory = "httproot" 465
466 - def getChildWithDefault(self, path, request):
467 # we override this method so that we can look up tree resources 468 # directly without having their parents. 469 # There's probably a more Twisted way of doing this, but ... 470 fullPath = path 471 if request.postpath: 472 fullPath += '/' + string.join(request.postpath, '/') 473 self.debug("Incoming request %r for path %s" % (request, fullPath)) 474 r = web_resource.Resource.getChildWithDefault(self, fullPath, request) 475 self.debug("Returning resource %r" % r) 476 return r
477