Package flumotion :: Package component :: Package misc :: Package httpfile :: Module httpfile
[hide private]

Source Code for Module flumotion.component.misc.httpfile.httpfile

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_component_httpserver -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21  import os 
 22  import time 
 23  import string 
 24   
 25  from twisted.web import resource, static, server, http 
 26  from twisted.web import error as weberror 
 27  from twisted.internet import defer, reactor, error 
 28  from twisted.cred import credentials 
 29  from zope.interface import implements 
 30   
 31  from flumotion.component import component 
 32  from flumotion.common import log, messages, errors, netutils, interfaces 
 33  from flumotion.component.component import moods 
 34  from flumotion.component.misc.porter import porterclient 
 35  from flumotion.component.base import http as httpbase 
 36   
 37  from flumotion.twisted import fdserver 
 38   
 39  from flumotion.component.misc.httpfile import file 
 40   
 41  from flumotion.common.messages import N_ 
 42  T_ = messages.gettexter('flumotion') 
 43   
44 -class CancellableRequest(server.Request):
45
46 - def __init__(self, channel, queued):
47 server.Request.__init__(self, channel, queued) 48 49 self._component = channel.factory.component 50 self._completed = False 51 self._transfer = None 52 53 self._bytes_written = 0 54 self._start_time = time.time() 55 self._lastTimeWritten = self._start_time 56 57 self._fd = self.transport.fileno() 58 59 self._component.requestStarted(self)
60
61 - def write(self, data):
62 server.Request.write(self, data) 63 64 self._bytes_written += len(data) 65 self._lastTimeWritten = time.time()
66
67 - def finish(self):
68 server.Request.finish(self) 69 fd = self.transport.fileno() 70 # We sent Connection: close, so we must close the connection 71 self.transport.loseConnection() 72 self.requestCompleted(fd)
73
74 - def connectionLost(self, reason):
75 server.Request.connectionLost(self, reason) 76 self.requestCompleted(self._fd)
77
78 - def requestCompleted(self, fd):
79 if not self._completed: 80 self._component.requestFinished(self, self._bytes_written, 81 time.time() - self._start_time, fd) 82 self._completed = True
83
84 -class Site(server.Site):
85 requestFactory = CancellableRequest 86
87 - def __init__(self, resource, component):
88 server.Site.__init__(self, resource) 89 90 self.component = component
91
92 -class HTTPFileMedium(component.BaseComponentMedium):
93 - def __init__(self, comp):
94 """ 95 @type comp: L{HTTPFileStreamer} 96 """ 97 component.BaseComponentMedium.__init__(self, comp)
98
99 - def authenticate(self, bouncerName, keycard):
100 """ 101 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None. 102 """ 103 return self.callRemote('authenticate', bouncerName, keycard)
104
105 - def keepAlive(self, bouncerName, issuerName, ttl):
106 """ 107 @rtype: L{twisted.internet.defer.Deferred} 108 """ 109 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
110
111 - def removeKeycardId(self, bouncerName, keycardId):
112 """ 113 @rtype: L{twisted.internet.defer.Deferred} 114 """ 115 return self.callRemote('removeKeycardId', bouncerName, keycardId)
116
117 - def remote_expireKeycard(self, keycardId):
118 return self.comp.httpauth.expireKeycard(keycardId)
119
120 - def remote_getStreamData(self):
121 return self.comp.getStreamData()
122
123 - def remote_getLoadData(self):
124 return self.comp.getLoadData()
125
126 - def remote_updatePorterDetails(self, path, username, password):
127 return self.comp.updatePorterDetails(path, username, password)
128
129 - def remote_rotateLog(self):
130 return self.comp.rotateLog()
131
132 -class HTTPFileStreamer(component.BaseComponent, log.Loggable):
133 implements(interfaces.IStreamingComponent) 134 135 componentMediumClass = HTTPFileMedium 136 137 REQUEST_TIMEOUT = 30 # Time out requests after this many seconds of 138 # inactivity 139
140 - def init(self):
141 self.mountPoint = None 142 self.type = None 143 self.port = None 144 self.hostname = None 145 self._loggers = [] 146 self._logfilter = None 147 self.httpauth = None 148 149 self._description = 'On-Demand Flumotion Stream' 150 151 self._singleFile = False 152 self._connected_clients = {} # fd -> CancellableRequest 153 self._total_bytes_written = 0 154 155 self._pbclient = None 156 157 self._twistedPort = None 158 self._timeoutRequestsCallLater = None 159 160 self._pendingDisconnects = {} 161 162 # FIXME: maybe we want to allow the configuration to specify 163 # additional mime -> File class mapping ? 164 self._mimeToResource = { 165 'video/x-flv': file.FLVFile, 166 } 167 168 # store number of connected clients 169 self.uiState.addKey("connected-clients", 0) 170 self.uiState.addKey("bytes-transferred", 0)
171
172 - def do_check(self):
173 props = self.config['properties'] 174 self.fixRenamedProperties(props, [ 175 ('issuer', 'issuer-class'), 176 ('porter_socket_path', 'porter-socket-path'), 177 ('porter_username', 'porter-username'), 178 ('porter_password', 'porter-password'), 179 ('mount_point', 'mount-point') 180 ]) 181 182 if props.get('type', 'master') == 'slave': 183 for k in 'socket-path', 'username', 'password': 184 if not 'porter-' + k in props: 185 msg = 'slave mode, missing required property porter-%s' % k 186 return defer.fail(errors.ConfigError(msg)) 187 188 path = props.get('path', None) 189 if path is None: 190 msg = "missing required property 'path'" 191 return defer.fail(errors.ConfigError(msg)) 192 if os.path.isfile(path): 193 self._singleFile = True 194 elif os.path.isdir(path): 195 self._singleFile = False 196 else: 197 msg = "the file or directory specified in 'path': %s does " \ 198 "not exist or is neither a file nor directory" % path 199 return defer.fail(errors.ConfigError(msg))
200
201 - def have_properties(self, props):
202 desc = props.get('description', None) 203 if desc: 204 self._description = desc 205 206 # always make sure the mount point starts with / 207 mountPoint = props.get('mount-point', '/') 208 if not mountPoint.startswith('/'): 209 mountPoint = '/' + mountPoint 210 self.mountPoint = mountPoint 211 self.hostname = props.get('hostname', None) 212 if not self.hostname: 213 self.hostname = netutils.guess_public_hostname() 214 215 self.filePath = props.get('path') 216 self.type = props.get('type', 'master') 217 self.port = props.get('port', 8801) 218 if self.type == 'slave': 219 # already checked for these in do_check 220 self._porterPath = props['porter-socket-path'] 221 self._porterUsername = props['porter-username'] 222 self._porterPassword = props['porter-password'] 223 self._loggers = \ 224 self.plugs.get('flumotion.component.plugs.loggers.Logger', []) 225 226 self.httpauth = httpbase.HTTPAuthentication(self) 227 228 if 'bouncer' in props: 229 self.httpauth.setBouncerName(props['bouncer']) 230 if 'issuer-class' in props: 231 self.httpauth.setIssuerClass(props['issuer-class']) 232 if 'ip-filter' in props: 233 filter = http.LogFilter() 234 for f in props['ip-filter']: 235 filter.addIPFilter(f) 236 self._logfilter = filter
237
238 - def do_setup(self):
239 self.have_properties(self.config['properties']) 240 self.debug('Starting with mount point "%s"' % self.mountPoint) 241 factory = file.MimedFileFactory(self.httpauth, 242 mimeToResource=self._mimeToResource) 243 if self.mountPoint == '/': 244 self.debug('mount point / - create File resource as root') 245 # directly create a File resource for the path 246 root = factory.create(self.filePath) 247 else: 248 # split path on / and add iteratively twisted.web resources 249 # Asking for '' or '/' will retrieve the root Resource's '' child, 250 # so the split on / returning a first list value '' is correct 251 self.debug('mount point %s - creating root Resource and children', 252 self.mountPoint) 253 root = resource.Resource() 254 children = string.split(self.mountPoint[1:], '/') 255 parent = root 256 for child in children[:-1]: 257 res = resource.Resource() 258 self.debug("Putting Resource at %s", child) 259 parent.putChild(child, res) 260 parent = res 261 fileResource = factory.create(self.filePath) 262 self.debug("Putting resource %r at %r", fileResource, children[-1]) 263 parent.putChild(children[-1], fileResource) 264 265 self._timeoutRequestsCallLater = reactor.callLater( 266 self.REQUEST_TIMEOUT, self._timeoutRequests) 267 268 d = defer.Deferred() 269 if self.type == 'slave': 270 # Streamer is slaved to a porter. 271 if self._singleFile: 272 self._pbclient = porterclient.HTTPPorterClientFactory( 273 Site(root, self), [self.mountPoint], d) 274 else: 275 self._pbclient = porterclient.HTTPPorterClientFactory( 276 Site(root, self), [], d, 277 prefixes=[self.mountPoint]) 278 creds = credentials.UsernamePassword(self._porterUsername, 279 self._porterPassword) 280 self._pbclient.startLogin(creds, self._pbclient.medium) 281 self.debug("Starting porter login!") 282 # This will eventually cause d to fire 283 reactor.connectWith(fdserver.FDConnector, self._porterPath, 284 self._pbclient, 10, checkPID=False) 285 else: 286 # File Streamer is standalone. 287 try: 288 self.debug('Going to listen on port %d' % self.port) 289 iface = "" 290 # we could be listening on port 0, in which case we need 291 # to figure out the actual port we listen on 292 self._twistedPort = reactor.listenTCP(self.port, 293 Site(root, self), interface=iface) 294 self.port = self._twistedPort.getHost().port 295 self.debug('Listening on port %d' % self.port) 296 except error.CannotListenError: 297 t = 'Port %d is not available.' % self.port 298 self.warning(t) 299 m = messages.Error(T_(N_( 300 "Network error: TCP port %d is not available."), self.port)) 301 self.addMessage(m) 302 self.setMood(moods.sad) 303 return defer.fail(errors.ComponentStartHandledError(t)) 304 # fire callback so component gets happy 305 d.callback(None) 306 # we are responsible for setting component happy 307 def setComponentHappy(result): 308 self.httpauth.scheduleKeepAlive() 309 self.setMood(moods.happy) 310 return result
311 d.addCallback(setComponentHappy) 312 return d
313
314 - def do_stop(self):
315 if self.httpauth: 316 self.httpauth.stopKeepAlive() 317 if self._timeoutRequestsCallLater: 318 self._timeoutRequestsCallLater.cancel() 319 self._timeoutRequestsCallLater = None 320 if self._twistedPort: 321 self._twistedPort.stopListening() 322 323 l = [self.remove_all_clients()] 324 if self.type == 'slave' and self._pbclient: 325 l.append(self._pbclient.deregisterPath(self.mountPoint)) 326 return defer.DeferredList(l)
327
328 - def updatePorterDetails(self, path, username, password):
329 """ 330 Provide a new set of porter login information, for when we're in slave 331 mode and the porter changes. 332 If we're currently connected, this won't disconnect - it'll just change 333 the information so that next time we try and connect we'll use the 334 new ones 335 """ 336 if self.type == 'slave': 337 self._porterUsername = username 338 self._porterPassword = password 339 340 creds = credentials.UsernamePassword(self._porterUsername, 341 self._porterPassword) 342 self._pbclient.startLogin(creds, self.medium) 343 344 # If we've changed paths, we must do some extra work. 345 if path != self._porterPath: 346 self._porterPath = path 347 self._pbclient.stopTrying() # Stop trying to connect with the 348 # old connector. 349 self._pbclient.resetDelay() 350 reactor.connectWith( 351 fdserver.FDConnector, self._porterPath, 352 self._pbclient, 10, checkPID=False) 353 else: 354 raise errors.WrongStateError( 355 "Can't specify porter details in master mode")
356
357 - def _timeoutRequests(self):
358 now = time.time() 359 for request in self._connected_clients.values(): 360 if now - request._lastTimeWritten > self.REQUEST_TIMEOUT: 361 self.debug("Timing out connection") 362 # Apparently this is private API. However, calling 363 # loseConnection is not sufficient - it won't drop the 364 # connection until the send queue is empty, which might never 365 # happen for an uncooperative client 366 request.channel.transport.connectionLost( 367 errors.TimeoutException()) 368 369 self._timeoutRequestsCallLater = reactor.callLater( 370 self.REQUEST_TIMEOUT, self._timeoutRequests)
371
372 - def remove_client(self, fd):
373 """ 374 Remove a client when requested. 375 376 Used by keycard expiry. 377 """ 378 if fd in self._connected_clients: 379 request = self._connected_clients[fd] 380 self.debug("Removing client for fd %d", fd) 381 request.unregisterProducer() 382 request.channel.transport.loseConnection() 383 else: 384 self.debug("No client with fd %d found", fd)
385
386 - def remove_all_clients(self):
387 l = [] 388 for fd in self._connected_clients: 389 d = defer.Deferred() 390 self._pendingDisconnects[fd] = d 391 l.append(d) 392 393 request = self._connected_clients[fd] 394 request.unregisterProducer() 395 request.channel.transport.loseConnection() 396 397 self.debug("Waiting for %d clients to finish", len(l)) 398 return defer.DeferredList(l)
399
400 - def requestStarted(self, request):
401 fd = request.transport.fileno() # ugly! 402 self._connected_clients[fd] = request 403 self.uiState.set("connected-clients", len(self._connected_clients))
404
405 - def requestFinished(self, request, bytesWritten, timeConnected, fd):
406 self.httpauth.cleanupAuth(fd) 407 headers = request.getAllHeaders() 408 409 ip = request.getClientIP() 410 if not self._logfilter or not self._logfilter.isInRange(ip): 411 args = {'ip': ip, 412 'time': time.gmtime(), 413 'method': request.method, 414 'uri': request.uri, 415 'username': '-', # FIXME: put the httpauth name 416 'get-parameters': request.args, 417 'clientproto': request.clientproto, 418 'response': request.code, 419 'bytes-sent': bytesWritten, 420 'referer': headers.get('referer', None), 421 'user-agent': headers.get('user-agent', None), 422 'time-connected': timeConnected} 423 424 l = [] 425 for logger in self._loggers: 426 l.append(defer.maybeDeferred( 427 logger.event, 'http_session_completed', args)) 428 d = defer.DeferredList(l) 429 else: 430 d = defer.succeed(None) 431 432 del self._connected_clients[fd] 433 434 self.uiState.set("connected-clients", len(self._connected_clients)) 435 436 self._total_bytes_written += bytesWritten 437 self.uiState.set("bytes-transferred", self._total_bytes_written) 438 439 def firePendingDisconnect(_): 440 self.debug("Logging completed") 441 if fd in self._pendingDisconnects: 442 pending = self._pendingDisconnects.pop(fd) 443 self.debug("Firing pending disconnect deferred") 444 pending.callback(None)
445 d.addCallback(firePendingDisconnect) 446
447 - def getDescription(self):
448 return self._description
449
450 - def getUrl(self):
451 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
452
453 - def getStreamData(self):
454 socket = 'flumotion.component.plugs.streamdata.StreamDataProvider' 455 if self.plugs[socket]: 456 plug = self.plugs[socket][-1] 457 return plug.getStreamData() 458 else: 459 return { 460 'protocol': 'HTTP', 461 'description': self._description, 462 'url' : self.getUrl() 463 }
464
465 - def getLoadData(self):
466 """ 467 Return a tuple (deltaadded, deltaremoved, bytes_transferred, 468 current_clients, current_load) of our current bandwidth and user values. 469 The deltas and current_load are NOT currently implemented here, we set 470 them as zero. 471 """ 472 bytesTransferred = self._total_bytes_written 473 for request in self._connected_clients.values(): 474 if request._transfer: 475 bytesTransferred += request._transfer.bytesWritten 476 477 return (0, 0, bytesTransferred, len(self._connected_clients), 0)
478
479 - def rotateLog(self):
480 """ 481 Close the logfile, then reopen using the previous logfilename 482 """ 483 for logger in self._loggers: 484 self.debug('rotating logger %r' % logger) 485 logger.rotate()
486