Trees | Indices | Help |
---|
|
1 # -*- Mode: Python; test-case-name: flumotion.test.test_worker_feed -*- 2 # vi:si:et:sw=4:sts=4:ts=4 3 # 4 # Flumotion - a streaming media server 5 # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 6 # All rights reserved. 7 8 # This file may be distributed and/or modified under the terms of 9 # the GNU General Public License version 2 as published by 10 # the Free Software Foundation. 11 # This file is distributed without any warranty; without even the implied 12 # warranty of merchantability or fitness for a particular purpose. 13 # See "LICENSE.GPL" in the source distribution for more information. 14 15 # Licensees having purchased or holding a valid Flumotion Advanced 16 # Streaming Server license may use this file in accordance with the 17 # Flumotion Advanced Streaming Server Commercial License Agreement. 18 # See "LICENSE.Flumotion" in the source distribution for more information. 19 20 # Headers in this file shall remain intact. 21 22 """ 23 implementation of a PB Server through which other components can request 24 to eat from or feed to this worker's components. 25 """ 26 27 from twisted.internet import reactor, defer, main 28 from twisted.python import components, failure, reflect 29 from twisted.spread import pb 30 from twisted.cred import portal 31 32 from flumotion.configure import configure 33 from flumotion.common import log, common, interfaces 34 from flumotion.twisted import checkers, fdserver 35 from flumotion.twisted import portal as fportal 36 from flumotion.twisted import pb as fpb 3739 """ 40 I am the feed server. PHEAR 41 """ 42 43 __implements__ = portal.IRealm 44 45 logCategory = 'dispatcher' 469348 """ 49 @param brain: L{flumotion.worker.worker.WorkerBrain} 50 """ 51 self._brain = brain 52 self._tport = None 53 self.listen(bouncer, portNum)5456 if not self._tport: 57 self.warning('not listening!') 58 return 0 59 return self._tport.getHost().port6062 portal = fportal.BouncerPortal(self, bouncer) 63 factory = pb.PBServerFactory(portal, 64 unsafeTracebacks=unsafeTracebacks) 65 66 tport = reactor.listenWith(fdserver.PassableServerPort, portNum, 67 factory) 68 69 self._tport = tport 70 self.debug('Listening for feed requests on TCP port %d', 71 self.getPortNum())72 77 78 ### IRealm method80 avatar = FeedAvatar(self, avatarId, mind) 81 return (pb.IPerspective, avatar, 82 lambda: self.avatarLogout(avatar))83 86 87 ## proxy these to the brain 9095 """ 96 I am an avatar in a FeedServer for components that log in and request 97 to eat from or feed to one of my components. 98 99 My mind is a reference to a L{FeedMedium} 100 """ 101 logCategory = "feedavatar" 102 remoteLogName = "feedmedium" 103173105 """ 106 """ 107 fpb.Avatar.__init__(self, avatarId) 108 self._transport = None 109 self.feedServer = feedServer 110 self.avatarId = avatarId 111 self.setMind(mind)112114 """ 115 Called when the PB client wants us to send them the given feed. 116 """ 117 # the PB message needs to be sent from the side that has the feeder 118 # for proper switching, so we call back as a reply 119 d = self.mindCallRemote('sendFeedReply', fullFeedId) 120 d.addCallback(self._sendFeedReplyCb, fullFeedId)121123 # compare with startStreaming in prototype 124 # Remove this from the reactor; we mustn't read or write from it from 125 # here on 126 t = self.mind.broker.transport 127 t.stopReading() 128 t.stopWriting() 129 130 # hand off the fd to the component 131 self.debug("Attempting to send FD: %d", t.fileno()) 132 133 (flowName, componentName, feedName) = common.parseFullFeedId(fullFeedId) 134 componentId = common.componentId(flowName, componentName) 135 136 if self.feedServer.feedToFD(componentId, feedName, t.fileno(), 137 self.avatarId): 138 t.keepSocketAlive = True 139 140 # We removed the transport from the reactor before sending the 141 # FD; now we want the socket cleaned up. 142 t.loseConnection()143145 """ 146 Called when the PB client wants to send the given feedId to the 147 given component 148 """ 149 # we need to make sure our result goes back, so only stop reading 150 t = self.mind.broker.transport 151 t.stopReading() 152 reactor.callLater(0, self._doReceiveFeed, fullFeedId)153155 t = self.mind.broker.transport 156 157 self.debug('flushing PB write queue') 158 t.doWrite() 159 self.debug('stop writing to transport') 160 t.stopWriting() 161 162 # hand off the fd to the component 163 self.debug("Attempting to send FD: %d", t.fileno()) 164 165 (flowName, componentName, eaterAlias) = common.parseFullFeedId(fullFeedId) 166 componentId = common.componentId(flowName, componentName) 167 168 if self.feedServer.eatFromFD(componentId, eaterAlias, t.fileno(), 169 self.avatarId): 170 t.keepSocketAlive = True 171 172 t.loseConnection()
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Thu Aug 7 15:03:05 2008 | http://epydoc.sourceforge.net |