1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 import os
22 import time
23 import string
24
25 from twisted.web import resource, static, server, http
26 from twisted.web import error as weberror
27 from twisted.internet import defer, reactor, error
28 from twisted.cred import credentials
29 from zope.interface import implements
30
31 from flumotion.component import component
32 from flumotion.common import log, messages, errors, netutils, interfaces
33 from flumotion.component.component import moods
34 from flumotion.component.misc.porter import porterclient
35 from flumotion.component.base import http as httpbase
36
37 from flumotion.twisted import fdserver
38
39 from flumotion.component.misc.httpfile import file
40
41 from flumotion.common.messages import N_
42 T_ = messages.gettexter('flumotion')
43
45
47 server.Request.__init__(self, channel, queued)
48
49 self._component = channel.factory.component
50 self._completed = False
51 self._transfer = None
52
53 self._bytes_written = 0
54 self._start_time = time.time()
55 self._lastTimeWritten = self._start_time
56
57 self._fd = self.transport.fileno()
58
59 self._component.requestStarted(self)
60
66
73
77
79 if not self._completed:
80 self._component.requestFinished(self, self._bytes_written,
81 time.time() - self._start_time, fd)
82 self._completed = True
83
84 -class Site(server.Site):
91
98
100 """
101 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None.
102 """
103 return self.callRemote('authenticate', bouncerName, keycard)
104
105 - def keepAlive(self, bouncerName, issuerName, ttl):
106 """
107 @rtype: L{twisted.internet.defer.Deferred}
108 """
109 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
110
112 """
113 @rtype: L{twisted.internet.defer.Deferred}
114 """
115 return self.callRemote('removeKeycardId', bouncerName, keycardId)
116
119
122
125
128
131
133 implements(interfaces.IStreamingComponent)
134
135 componentMediumClass = HTTPFileMedium
136
137 REQUEST_TIMEOUT = 30
138
139
141 self.mountPoint = None
142 self.type = None
143 self.port = None
144 self.hostname = None
145 self._loggers = []
146 self._logfilter = None
147 self.httpauth = None
148
149 self._description = 'On-Demand Flumotion Stream'
150
151 self._singleFile = False
152 self._connected_clients = {}
153 self._total_bytes_written = 0
154
155 self._pbclient = None
156
157 self._twistedPort = None
158 self._timeoutRequestsCallLater = None
159
160 self._pendingDisconnects = {}
161
162
163
164 self._mimeToResource = {
165 'video/x-flv': file.FLVFile,
166 }
167
168
169 self.uiState.addKey("connected-clients", 0)
170 self.uiState.addKey("bytes-transferred", 0)
171
173 props = self.config['properties']
174 self.fixRenamedProperties(props, [
175 ('issuer', 'issuer-class'),
176 ('porter_socket_path', 'porter-socket-path'),
177 ('porter_username', 'porter-username'),
178 ('porter_password', 'porter-password'),
179 ('mount_point', 'mount-point')
180 ])
181
182 if props.get('type', 'master') == 'slave':
183 for k in 'socket-path', 'username', 'password':
184 if not 'porter-' + k in props:
185 msg = 'slave mode, missing required property porter-%s' % k
186 return defer.fail(errors.ConfigError(msg))
187
188 path = props.get('path', None)
189 if path is None:
190 msg = "missing required property 'path'"
191 return defer.fail(errors.ConfigError(msg))
192 if os.path.isfile(path):
193 self._singleFile = True
194 elif os.path.isdir(path):
195 self._singleFile = False
196 else:
197 msg = "the file or directory specified in 'path': %s does " \
198 "not exist or is neither a file nor directory" % path
199 return defer.fail(errors.ConfigError(msg))
200
202 desc = props.get('description', None)
203 if desc:
204 self._description = desc
205
206
207 mountPoint = props.get('mount-point', '/')
208 if not mountPoint.startswith('/'):
209 mountPoint = '/' + mountPoint
210 self.mountPoint = mountPoint
211 self.hostname = props.get('hostname', None)
212 if not self.hostname:
213 self.hostname = netutils.guess_public_hostname()
214
215 self.filePath = props.get('path')
216 self.type = props.get('type', 'master')
217 self.port = props.get('port', 8801)
218 if self.type == 'slave':
219
220 self._porterPath = props['porter-socket-path']
221 self._porterUsername = props['porter-username']
222 self._porterPassword = props['porter-password']
223 self._loggers = \
224 self.plugs.get('flumotion.component.plugs.loggers.Logger', [])
225
226 self.httpauth = httpbase.HTTPAuthentication(self)
227
228 if 'bouncer' in props:
229 self.httpauth.setBouncerName(props['bouncer'])
230 if 'issuer-class' in props:
231 self.httpauth.setIssuerClass(props['issuer-class'])
232 if 'ip-filter' in props:
233 filter = http.LogFilter()
234 for f in props['ip-filter']:
235 filter.addIPFilter(f)
236 self._logfilter = filter
237
239 self.have_properties(self.config['properties'])
240 self.debug('Starting with mount point "%s"' % self.mountPoint)
241 factory = file.MimedFileFactory(self.httpauth,
242 mimeToResource=self._mimeToResource)
243 if self.mountPoint == '/':
244 self.debug('mount point / - create File resource as root')
245
246 root = factory.create(self.filePath)
247 else:
248
249
250
251 self.debug('mount point %s - creating root Resource and children',
252 self.mountPoint)
253 root = resource.Resource()
254 children = string.split(self.mountPoint[1:], '/')
255 parent = root
256 for child in children[:-1]:
257 res = resource.Resource()
258 self.debug("Putting Resource at %s", child)
259 parent.putChild(child, res)
260 parent = res
261 fileResource = factory.create(self.filePath)
262 self.debug("Putting resource %r at %r", fileResource, children[-1])
263 parent.putChild(children[-1], fileResource)
264
265 self._timeoutRequestsCallLater = reactor.callLater(
266 self.REQUEST_TIMEOUT, self._timeoutRequests)
267
268 d = defer.Deferred()
269 if self.type == 'slave':
270
271 if self._singleFile:
272 self._pbclient = porterclient.HTTPPorterClientFactory(
273 Site(root, self), [self.mountPoint], d)
274 else:
275 self._pbclient = porterclient.HTTPPorterClientFactory(
276 Site(root, self), [], d,
277 prefixes=[self.mountPoint])
278 creds = credentials.UsernamePassword(self._porterUsername,
279 self._porterPassword)
280 self._pbclient.startLogin(creds, self._pbclient.medium)
281 self.debug("Starting porter login!")
282
283 reactor.connectWith(fdserver.FDConnector, self._porterPath,
284 self._pbclient, 10, checkPID=False)
285 else:
286
287 try:
288 self.debug('Going to listen on port %d' % self.port)
289 iface = ""
290
291
292 self._twistedPort = reactor.listenTCP(self.port,
293 Site(root, self), interface=iface)
294 self.port = self._twistedPort.getHost().port
295 self.debug('Listening on port %d' % self.port)
296 except error.CannotListenError:
297 t = 'Port %d is not available.' % self.port
298 self.warning(t)
299 m = messages.Error(T_(N_(
300 "Network error: TCP port %d is not available."), self.port))
301 self.addMessage(m)
302 self.setMood(moods.sad)
303 return defer.fail(errors.ComponentStartHandledError(t))
304
305 d.callback(None)
306
307 def setComponentHappy(result):
308 self.httpauth.scheduleKeepAlive()
309 self.setMood(moods.happy)
310 return result
311 d.addCallback(setComponentHappy)
312 return d
313
315 if self.httpauth:
316 self.httpauth.stopKeepAlive()
317 if self._timeoutRequestsCallLater:
318 self._timeoutRequestsCallLater.cancel()
319 self._timeoutRequestsCallLater = None
320 if self._twistedPort:
321 self._twistedPort.stopListening()
322
323 l = [self.remove_all_clients()]
324 if self.type == 'slave' and self._pbclient:
325 l.append(self._pbclient.deregisterPath(self.mountPoint))
326 return defer.DeferredList(l)
327
329 """
330 Provide a new set of porter login information, for when we're in slave
331 mode and the porter changes.
332 If we're currently connected, this won't disconnect - it'll just change
333 the information so that next time we try and connect we'll use the
334 new ones
335 """
336 if self.type == 'slave':
337 self._porterUsername = username
338 self._porterPassword = password
339
340 creds = credentials.UsernamePassword(self._porterUsername,
341 self._porterPassword)
342 self._pbclient.startLogin(creds, self.medium)
343
344
345 if path != self._porterPath:
346 self._porterPath = path
347 self._pbclient.stopTrying()
348
349 self._pbclient.resetDelay()
350 reactor.connectWith(
351 fdserver.FDConnector, self._porterPath,
352 self._pbclient, 10, checkPID=False)
353 else:
354 raise errors.WrongStateError(
355 "Can't specify porter details in master mode")
356
371
373 """
374 Remove a client when requested.
375
376 Used by keycard expiry.
377 """
378 if fd in self._connected_clients:
379 request = self._connected_clients[fd]
380 self.debug("Removing client for fd %d", fd)
381 request.unregisterProducer()
382 request.channel.transport.loseConnection()
383 else:
384 self.debug("No client with fd %d found", fd)
385
387 l = []
388 for fd in self._connected_clients:
389 d = defer.Deferred()
390 self._pendingDisconnects[fd] = d
391 l.append(d)
392
393 request = self._connected_clients[fd]
394 request.unregisterProducer()
395 request.channel.transport.loseConnection()
396
397 self.debug("Waiting for %d clients to finish", len(l))
398 return defer.DeferredList(l)
399
401 fd = request.transport.fileno()
402 self._connected_clients[fd] = request
403 self.uiState.set("connected-clients", len(self._connected_clients))
404
406 self.httpauth.cleanupAuth(fd)
407 headers = request.getAllHeaders()
408
409 ip = request.getClientIP()
410 if not self._logfilter or not self._logfilter.isInRange(ip):
411 args = {'ip': ip,
412 'time': time.gmtime(),
413 'method': request.method,
414 'uri': request.uri,
415 'username': '-',
416 'get-parameters': request.args,
417 'clientproto': request.clientproto,
418 'response': request.code,
419 'bytes-sent': bytesWritten,
420 'referer': headers.get('referer', None),
421 'user-agent': headers.get('user-agent', None),
422 'time-connected': timeConnected}
423
424 l = []
425 for logger in self._loggers:
426 l.append(defer.maybeDeferred(
427 logger.event, 'http_session_completed', args))
428 d = defer.DeferredList(l)
429 else:
430 d = defer.succeed(None)
431
432 del self._connected_clients[fd]
433
434 self.uiState.set("connected-clients", len(self._connected_clients))
435
436 self._total_bytes_written += bytesWritten
437 self.uiState.set("bytes-transferred", self._total_bytes_written)
438
439 def firePendingDisconnect(_):
440 self.debug("Logging completed")
441 if fd in self._pendingDisconnects:
442 pending = self._pendingDisconnects.pop(fd)
443 self.debug("Firing pending disconnect deferred")
444 pending.callback(None)
445 d.addCallback(firePendingDisconnect)
446
448 return self._description
449
451 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
452
454 socket = 'flumotion.component.plugs.streamdata.StreamDataProvider'
455 if self.plugs[socket]:
456 plug = self.plugs[socket][-1]
457 return plug.getStreamData()
458 else:
459 return {
460 'protocol': 'HTTP',
461 'description': self._description,
462 'url' : self.getUrl()
463 }
464
466 """
467 Return a tuple (deltaadded, deltaremoved, bytes_transferred,
468 current_clients, current_load) of our current bandwidth and user values.
469 The deltas and current_load are NOT currently implemented here, we set
470 them as zero.
471 """
472 bytesTransferred = self._total_bytes_written
473 for request in self._connected_clients.values():
474 if request._transfer:
475 bytesTransferred += request._transfer.bytesWritten
476
477 return (0, 0, bytesTransferred, len(self._connected_clients), 0)
478
480 """
481 Close the logfile, then reopen using the previous logfilename
482 """
483 for logger in self._loggers:
484 self.debug('rotating logger %r' % logger)
485 logger.rotate()
486