1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 from urllib2 import urlparse
23
24 from twisted.internet import protocol, reactor, address, error, defer
25
26 from twisted.spread import pb
27 from twisted.cred import portal
28
29 from flumotion.common import medium, log, messages
30 from flumotion.twisted import credentials, fdserver, checkers
31 from flumotion.twisted import reflect
32
33 from flumotion.component import component
34 from flumotion.component.component import moods
35
36 import socket, string, os, random
37
38 from flumotion.common.messages import N_
39 T_ = messages.gettexter('flumotion')
40
42 """
43 An Avatar in the porter representing a streamer
44 """
45 - def __init__(self, avatarId, porter, mind):
52
54 return self.mind != None
55
57 self.debug("porter client %s logging out", self.avatarId)
58 self.mind = None
59
63
67
71
75
77 """
78 A Realm within the Porter that creates Avatars for streamers logging into
79 the porter.
80 """
81 __implements__ = portal.IRealm
82
84 """
85 @param porter: The porter that avatars created from here should use.
86 @type porter: L{Porter}
87 """
88 self.porter = porter
89
98
100
102 """
103 Return the location, login username/password, and listening port
104 and interface for the porter as a tuple (path, username,
105 password, port, interface).
106 """
107 return (self.comp._socketPath, self.comp._username,
108 self.comp._password, self.comp._iptablesPort,
109 self.comp._interface)
110
111 -class Porter(component.BaseComponent, log.Loggable):
112 """
113 The porter optionally sits in front of a set of streamer components.
114 The porter is what actually deals with incoming connections on a TCP socket.
115 It decides which streamer to direct the connection to, then passes the FD
116 (along with some amount of already-read data) to the appropriate streamer.
117 """
118
119 componentMediumClass = PorterMedium
120
122
123
124 self._mappings = {}
125 self._prefixes = {}
126
127 self._socketlistener = None
128
129 self._socketPath = None
130 self._username = None
131 self._password = None
132 self._port = None
133 self._iptablesPort = None
134 self._porterProtocol = None
135
136 self._interface = ''
137
139 """
140 Register a path as being served by a streamer represented by this
141 avatar. Will remove any previous registration at this path.
142
143 @param path: The path to register
144 @type path: str
145 @param avatar: The avatar representing the streamer to direct this path
146 to
147 @type avatar: L{PorterAvatar}
148 """
149 self.debug("Registering porter path \"%s\" to %r" % (path, avatar))
150 if self._mappings.has_key(path):
151 self.warning("Replacing existing mapping for path \"%s\"" % path)
152
153 self._mappings[path] = avatar
154
156 """
157 Attempt to deregister the given path. A deregistration will only be
158 accepted if the mapping is to the avatar passed.
159
160 @param path: The path to deregister
161 @type path: str
162 @param avatar: The avatar representing the streamer being deregistered
163 @type avatar: L{PorterAvatar}
164 """
165 if self._mappings.has_key(path):
166 if self._mappings[path] == avatar:
167 self.debug("Removing porter mapping for \"%s\"" % path)
168 del self._mappings[path]
169 else:
170 self.warning("Mapping not removed: refers to a different avatar")
171 else:
172 self.warning("Mapping not removed: no mapping found")
173
175 """
176 Register a destination for all requests directed to anything beginning
177 with a specified prefix. Where there are multiple matching prefixes, the
178 longest is selected.
179
180 @param avatar: The avatar being registered
181 @type avatar: L{PorterAvatar}
182 """
183
184 self.debug("Setting prefix \"%s\" for porter", prefix)
185 if prefix in self._prefixes:
186 self.warning("Overwriting prefix")
187
188 self._prefixes[prefix] = avatar
189
191 """
192 Attempt to deregister a default destination for all requests not
193 directed to a specifically-mapped path. This will only succeed if the
194 default is currently equal to this avatar.
195
196 @param avatar: The avatar being deregistered
197 @type avatar: L{PorterAvatar}
198 """
199 if prefix not in self._prefixes:
200 self.warning("Mapping not removed: no mapping found")
201 return
202
203 if self._prefixes[prefix] == avatar:
204 self.debug("Removing prefix destination from porter")
205 del self._prefixes[prefix]
206 else:
207 self.warning("Not removing prefix destination: expected avatar not found")
208
210 found = None
211
212 for prefix in self._prefixes.keys():
213 self.log("Checking: %r, %r" % (prefix, path))
214 if (path.startswith(prefix) and (not found or len(found) < len(prefix))):
215 found = prefix
216 if found:
217 return self._prefixes[found]
218 else:
219 return None
220
222 """
223 Find a destination Avatar for this path.
224 @returns: The Avatar for this mapping, or None.
225 """
226
227 if self._mappings.has_key(path):
228 return self._mappings[path]
229 else:
230 return self.findPrefixMatch(path)
231
232
234 """
235 Generate a socket pathname in an appropriate location
236 """
237
238
239 import tempfile
240 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.porter.')
241 os.close(fd)
242
243 return name
244
246 """
247 Generate a random US-ASCII string of length numchars
248 """
249 str = ""
250 chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
251 for _ in range(numchars):
252 str += chars[random.randint(0, len(chars)-1)]
253
254 return str
255
257 props = self.config['properties']
258
259 self.fixRenamedProperties(props,
260 [('socket_path', 'socket-path')])
261
262
263
264
265
266 if props.has_key('socket-path'):
267
268 self._socketPath = props['socket-path']
269 self._username = props['username']
270 self._password = props['password']
271 else:
272
273
274 self._username = self.generateRandomString(12)
275 self._password = self.generateRandomString(12)
276 self._socketPath = self.generateSocketPath()
277
278 self._port = int(props['port'])
279 self._iptablesPort = int(props.get('iptables-port', self._port))
280 self._porterProtocol = props.get('protocol',
281 'flumotion.component.misc.porter.porter.HTTPPorterProtocol')
282 self._interface = props.get('interface', '')
283
285 d = None
286 if self._socketlistener:
287
288
289 d = self._socketlistener.stopListening()
290 self._socketlistener = None
291 return d
292
294
295 self.have_properties()
296 realm = PorterRealm(self)
297 checker = checkers.FlexibleCredentialsChecker()
298 checker.addUser(self._username, self._password)
299
300 p = portal.Portal(realm, [checker])
301 serverfactory = pb.PBServerFactory(p)
302
303
304
305 try:
306
307
308
309 try:
310 os.unlink(self._socketPath)
311 except:
312 pass
313
314 self._socketlistener = reactor.listenWith(
315 fdserver.FDPort, self._socketPath, serverfactory)
316 self.debug("Now listening on socketPath %s" % self._socketPath)
317 except error.CannotListenError, e:
318 self.warning("Failed to create socket %s" % self._socketPath)
319 m = messages.Error(T_(N_(
320 "Network error: socket path %s is not available."),
321 self._socketPath))
322 self.addMessage(m)
323 self.setMood(moods.sad)
324 return defer.fail(e)
325
326
327
328 try:
329 proto = reflect.namedAny(self._porterProtocol)
330 self.debug("Created proto %r" % proto)
331 except:
332 self.warning("Failed to import protocol '%s', defaulting to HTTP" %
333 self._porterProtocol)
334 proto = HTTPPorterProtocol
335
336
337
338 factory = PorterProtocolFactory(self, proto)
339 try:
340 reactor.listenWith(
341 fdserver.PassableServerPort, self._port, factory,
342 interface=self._interface)
343 self.debug("Now listening on port %d" % self._port)
344 except error.CannotListenError, e:
345 self.warning("Failed to listen on port %d" % self._port)
346 m = messages.Error(T_(N_(
347 "Network error: TCP port %d is not available."), self._port))
348 self.addMessage(m)
349 self.setMood(moods.sad)
350 return defer.fail(e)
351
354 self._porter = porter
355 self.protocol = protocol
356
358 p = self.protocol(self._porter)
359 p.factory = self
360 return p
361
363 """
364 The base porter is capable of accepting HTTP-like protocols (including
365 RTSP) - it reads the first line of a request, and makes the decision
366 solely on that.
367
368 We can't guarantee that we read precisely a line, so the buffer we
369 accumulate will actually be larger than what we actually parse.
370
371 @cvar MAX_SIZE: the maximum number of bytes allowed for the first line
372 @cvar delimiters: a list of valid line delimiters I check for
373 """
374
375 MAX_SIZE = 4096
376
377
378
379 PORTER_CLIENT_TIMEOUT = 30
380
381
382
383
384
385 delimiters = ['\r\n', '\n', '\r']
386
395
397 self._timeoutDC = None
398 self.debug("Timing out porter client after %d seconds",
399 self.PORTER_CLIENT_TIMEOUT)
400 self.transport.loseConnection()
401
403 if self._timeoutDC:
404 self._timeoutDC.cancel()
405 self._timeoutDC = None
406
408 self._buffer = self._buffer + data
409 self.log("Got data, buffer now \"%s\"" % self._buffer)
410
411
412 for delim in self.delimiters:
413 try:
414 line, remaining = self._buffer.split(delim, 1)
415 break
416 except ValueError:
417
418 pass
419 else:
420
421 self.log("No valid delimiter found")
422 if len(self._buffer) > self.MAX_SIZE:
423 self.log("Dropping connection!")
424 return self.transport.loseConnection()
425 else:
426
427
428 return
429
430
431
432 identifier = self.parseLine(line)
433
434 if not identifier:
435 self.log("Couldn't find identifier in first line")
436 return self.transport.loseConnection()
437
438
439
440 destinationAvatar = self._porter.findDestination(identifier)
441
442 if not destinationAvatar or not destinationAvatar.isAttached():
443 if destinationAvatar:
444 self.debug("There was an avatar, but it logged out?")
445 self.debug("No destination avatar found for \"%s\"" % identifier)
446 self.writeNotFoundResponse()
447 return self.transport.loseConnection()
448
449
450
451
452
453
454
455
456
457 self.debug("Attempting to send FD: %d" % self.transport.fileno())
458 destinationAvatar.mind.broker.transport.sendFileDescriptor(
459 self.transport.fileno(), self._buffer)
460
461
462
463
464
465 self.transport.keepSocketAlive = True
466 self.transport.loseConnection()
467
469 """
470 Parse the initial line of the response. Return a string usable for
471 uniquely identifying the stream being requested, or None if the request
472 is unreadable.
473
474 Subclasses should override this.
475 """
476 raise NotImplementedError
477
479 """
480 Write a response indicating that the requested resource was not found
481 in this protocol.
482
483 Subclasses should override this to use the correct protocol.
484 """
485 raise NotImplementedError
486
488 scheme = 'http'
489 protos = ["HTTP/1.0", "HTTP/1.1"]
490
509
511 self.transport.write("HTTP/1.0 404 Not Found\r\n\r\nResource unknown")
512
514 scheme = 'rtsp'
515 protos = ["RTSP/1.0"]
516
518 self.transport.write("RTSP/1.0 404 Not Found\r\n\r\nResource unknown")
519