Package flumotion :: Package component :: Module padmonitor
[hide private]

Source Code for Module flumotion.component.padmonitor

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_feedcomponent010 -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import gst 
 23   
 24  import time 
 25   
 26  from twisted.internet import reactor, defer 
 27   
 28  from flumotion.common import log, common 
 29   
 30   
31 -class PadMonitor(log.Loggable):
32 PAD_MONITOR_PROBE_FREQUENCY = 5.0 33 PAD_MONITOR_TIMEOUT = PAD_MONITOR_PROBE_FREQUENCY * 2.5 34
35 - def __init__(self, pad, name, setActive, setInactive):
36 self._last_data_time = -1 37 self._pad = pad 38 self.name = name 39 self._active = False 40 self._first = True 41 42 self._doSetActive = setActive 43 self._doSetInactive = setInactive 44 45 # This dict sillyness is because python's dict operations are atomic 46 # w.r.t. the GIL. 47 self._probe_id = {} 48 49 self.check_poller = common.Poller(self._check_timeout, 50 self.PAD_MONITOR_PROBE_FREQUENCY, 51 immediately=True) 52 53 self.watch_poller = common.Poller(self._watch_timeout, 54 self.PAD_MONITOR_TIMEOUT)
55
56 - def logMessage(self, message, *args):
57 if self._first: 58 self.debug(message, *args) 59 else: 60 self.log(message, *args)
61
62 - def isActive(self):
63 return self._active
64
65 - def detach(self):
66 self.check_poller.stop() 67 self.watch_poller.stop() 68 69 # implementation closely tied to _check_timeout wrt to GIL 70 # tricks, threadsafety, and getting the probe deferred to 71 # actually return 72 d, probe_id = self._probe_id.pop("id", (None, None)) 73 if probe_id: 74 self._pad.remove_buffer_probe(probe_id) 75 d.callback(None)
76
77 - def _check_timeout(self):
78 def probe_cb(pad, buffer): 79 """ 80 Periodically scheduled buffer probe, that ensures that we're 81 currently actually having dataflow through our eater 82 elements. 83 84 Called from GStreamer threads. 85 86 @param pad: The gst.Pad srcpad for one eater in this 87 component. 88 @param buffer: A gst.Buffer that has arrived on this pad 89 """ 90 self._last_data_time = time.time() 91 92 self.logMessage('buffer probe on %s has timestamp %s', self.name, 93 gst.TIME_ARGS(buffer.timestamp)) 94 95 deferred, probe_id = self._probe_id.pop("id", (None, None)) 96 if probe_id: 97 # This will be None only if detach() has been called. 98 self._pad.remove_buffer_probe(probe_id) 99 100 reactor.callFromThread(deferred.callback, None) 101 # Data received! Return to happy ASAP: 102 reactor.callFromThread(self.watch_poller.run) 103 104 self._first = False 105 106 # let the buffer through 107 return True
108 109 d = defer.Deferred() 110 # FIXME: this is racy: evaluate RHS, drop GIL, buffer probe 111 # fires before __setitem__ in LHS; need a mutex 112 self._probe_id['id'] = (d, self._pad.add_buffer_probe(probe_cb)) 113 return d
114
115 - def _watch_timeout(self):
116 self.log('last buffer for %s at %r', self.name, self._last_data_time) 117 118 now = time.time() 119 120 if self._last_data_time < 0: 121 # We never received any data in the first timeout period... 122 self._last_data_time = 0 123 self.setInactive() 124 elif self._last_data_time == 0: 125 # still no data... 126 pass 127 else: 128 # We received data at some time in the past. 129 delta = now - self._last_data_time 130 131 if self._active and delta > self.PAD_MONITOR_TIMEOUT: 132 self.info("No data received on pad %s for > %r seconds, setting " 133 "to hungry", self.name, self.PAD_MONITOR_TIMEOUT) 134 self.setInactive() 135 elif not self._active and delta < self.PAD_MONITOR_TIMEOUT: 136 self.info("Receiving data again on pad %s, flow active", 137 self.name) 138 self.setActive()
139
140 - def setInactive(self):
141 self._active = False 142 self._doSetInactive(self.name)
143
144 - def setActive(self):
145 self._active = True 146 self._doSetActive(self.name)
147
148 -class EaterPadMonitor(PadMonitor):
149 - def __init__(self, pad, name, setActive, setInactive, 150 reconnectEater, *args):
151 PadMonitor.__init__(self, pad, name, setActive, setInactive) 152 153 self._reconnectPoller = common.Poller(lambda: reconnectEater(*args), 154 self.PAD_MONITOR_TIMEOUT, 155 start=False)
156
157 - def setInactive(self):
158 PadMonitor.setInactive(self) 159 160 # If an eater received a buffer before being marked as disconnected, 161 # and still within the buffer check interval, the next eaterCheck 162 # call could accidentally think the eater was reconnected properly. 163 # Setting this to 0 here avoids that happening in eaterCheck. 164 self._last_data_time = 0 165 166 self._reconnectPoller.start(immediately=True)
167
168 - def setActive(self):
169 PadMonitor.setActive(self) 170 self._reconnectPoller.stop()
171
172 - def detach(self):
173 PadMonitor.detach(self) 174 self._reconnectPoller.stop()
175 176
177 -class PadMonitorSet(dict, log.Loggable):
178 - def __init__(self, setActive, setInactive):
179 # These callbacks will be called when the set as a whole is 180 # active or inactive. 181 self._doSetActive = setActive 182 self._doSetInactive = setInactive 183 self._wasActive = True
184
185 - def attach(self, pad, name, klass=PadMonitor, *args):
186 """ 187 Watch for data flow through this pad periodically. 188 If data flow ceases for too long, we turn hungry. If data flow resumes, 189 we return to happy. 190 """ 191 def monitorActive(name): 192 self.info('Pad data flow at %s is active', name) 193 if self.isActive() and not self._wasActive: 194 # The wasActive check is to prevent _doSetActive from being 195 # called happy initially because of this; only if we 196 # previously went inactive because of an inactive monitor. A 197 # curious interface. 198 self._wasActive = True 199 self._doSetActive()
200 201 def monitorInactive(name): 202 self.info('Pad data flow at %s is inactive', name) 203 if self._wasActive: 204 self._doSetInactive() 205 self._wasActive = False
206 207 assert name not in self 208 monitor = klass(pad, name, monitorActive, monitorInactive, *args) 209 self[monitor.name] = monitor 210 self.info("Added pad monitor %s", monitor.name) 211
212 - def remove(self, name):
213 if name not in self: 214 self.warning("No pad monitor with name %s", name) 215 return 216 217 monitor = self.pop(name) 218 monitor.detach()
219
220 - def isActive(self):
221 for monitor in self.values(): 222 if not monitor.isActive(): 223 return False 224 return True
225