Package flumotion :: Package worker :: Module feedserver
[hide private]

Source Code for Module flumotion.worker.feedserver

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_worker_feed -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  implementation of a PB Server through which other components can request 
 24  to eat from or feed to this worker's components. 
 25  """ 
 26   
 27  from twisted.internet import reactor, defer, main 
 28  from twisted.python import components, failure, reflect 
 29  from twisted.spread import pb 
 30  from twisted.cred import portal 
 31   
 32  from flumotion.configure import configure 
 33  from flumotion.common import log, common, interfaces 
 34  from flumotion.twisted import checkers, fdserver 
 35  from flumotion.twisted import portal as fportal 
 36  from flumotion.twisted import pb as fpb 
 37   
38 -class FeedServer(log.Loggable):
39 """ 40 I am the feed server. PHEAR 41 """ 42 43 __implements__ = portal.IRealm 44 45 logCategory = 'dispatcher' 46
47 - def __init__(self, brain, bouncer, portNum):
48 """ 49 @param brain: L{flumotion.worker.worker.WorkerBrain} 50 """ 51 self._brain = brain 52 self._tport = None 53 self.listen(bouncer, portNum)
54
55 - def getPortNum(self):
56 if not self._tport: 57 self.warning('not listening!') 58 return 0 59 return self._tport.getHost().port
60
61 - def listen(self, bouncer, portNum, unsafeTracebacks=0):
62 portal = fportal.BouncerPortal(self, bouncer) 63 factory = pb.PBServerFactory(portal, 64 unsafeTracebacks=unsafeTracebacks) 65 66 tport = reactor.listenWith(fdserver.PassableServerPort, portNum, 67 factory) 68 69 self._tport = tport 70 self.debug('Listening for feed requests on TCP port %d', 71 self.getPortNum())
72
73 - def shutdown(self):
74 d = self._tport.stopListening() 75 self._tport = None 76 return d
77 78 ### IRealm method
79 - def requestAvatar(self, avatarId, keycard, mind, *ifaces):
80 avatar = FeedAvatar(self, avatarId, mind) 81 return (pb.IPerspective, avatar, 82 lambda: self.avatarLogout(avatar))
83
84 - def avatarLogout(self, avatar):
85 self.debug('feed avatar logged out: %s', avatar.avatarId)
86 87 ## proxy these to the brain
88 - def feedToFD(self, componentId, feedId, fd, eaterId):
89 return self._brain.feedToFD(componentId, feedId, fd, eaterId)
90
91 - def eatFromFD(self, componentId, eaterAlias, fd, feedId):
92 return self._brain.eatFromFD(componentId, eaterAlias, fd, feedId)
93
94 -class FeedAvatar(fpb.Avatar):
95 """ 96 I am an avatar in a FeedServer for components that log in and request 97 to eat from or feed to one of my components. 98 99 My mind is a reference to a L{FeedMedium} 100 """ 101 logCategory = "feedavatar" 102 remoteLogName = "feedmedium" 103
104 - def __init__(self, feedServer, avatarId, mind):
105 """ 106 """ 107 fpb.Avatar.__init__(self, avatarId) 108 self._transport = None 109 self.feedServer = feedServer 110 self.avatarId = avatarId 111 self.setMind(mind)
112
113 - def perspective_sendFeed(self, fullFeedId):
114 """ 115 Called when the PB client wants us to send them the given feed. 116 """ 117 # the PB message needs to be sent from the side that has the feeder 118 # for proper switching, so we call back as a reply 119 d = self.mindCallRemote('sendFeedReply', fullFeedId) 120 d.addCallback(self._sendFeedReplyCb, fullFeedId)
121
122 - def _sendFeedReplyCb(self, result, fullFeedId):
123 # compare with startStreaming in prototype 124 # Remove this from the reactor; we mustn't read or write from it from 125 # here on 126 t = self.mind.broker.transport 127 t.stopReading() 128 t.stopWriting() 129 130 # hand off the fd to the component 131 self.debug("Attempting to send FD: %d", t.fileno()) 132 133 (flowName, componentName, feedName) = common.parseFullFeedId(fullFeedId) 134 componentId = common.componentId(flowName, componentName) 135 136 if self.feedServer.feedToFD(componentId, feedName, t.fileno(), 137 self.avatarId): 138 t.keepSocketAlive = True 139 140 # We removed the transport from the reactor before sending the 141 # FD; now we want the socket cleaned up. 142 t.loseConnection()
143
144 - def perspective_receiveFeed(self, fullFeedId):
145 """ 146 Called when the PB client wants to send the given feedId to the 147 given component 148 """ 149 # we need to make sure our result goes back, so only stop reading 150 t = self.mind.broker.transport 151 t.stopReading() 152 reactor.callLater(0, self._doReceiveFeed, fullFeedId)
153
154 - def _doReceiveFeed(self, fullFeedId):
155 t = self.mind.broker.transport 156 157 self.debug('flushing PB write queue') 158 t.doWrite() 159 self.debug('stop writing to transport') 160 t.stopWriting() 161 162 # hand off the fd to the component 163 self.debug("Attempting to send FD: %d", t.fileno()) 164 165 (flowName, componentName, eaterAlias) = common.parseFullFeedId(fullFeedId) 166 componentId = common.componentId(flowName, componentName) 167 168 if self.feedServer.eatFromFD(componentId, eaterAlias, t.fileno(), 169 self.avatarId): 170 t.keepSocketAlive = True 171 172 t.loseConnection()
173