Package flumotion :: Package component :: Module feedcomponent010
[hide private]

Source Code for Module flumotion.component.feedcomponent010

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_feedcomponent010 -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import gst 
 23  import gobject 
 24   
 25  import os 
 26  import time 
 27   
 28  from twisted.internet import reactor, defer 
 29   
 30  from flumotion.component import component as basecomponent 
 31  from flumotion.common import common, errors, pygobject, messages 
 32  from flumotion.common import gstreamer 
 33  from flumotion.component import feed, padmonitor 
 34  from flumotion.component.feeder import Feeder 
 35  from flumotion.component.eater import Eater 
 36   
 37  from flumotion.common.planet import moods 
 38   
 39  from flumotion.common.messages import N_ 
 40  T_ = messages.gettexter('flumotion') 
 41   
 42   
43 -class FeedComponent(basecomponent.BaseComponent):
44 """ 45 I am a base class for all Flumotion feed components. 46 """ 47 48 # how often to update the UIState feeder statistics 49 FEEDER_STATS_UPDATE_FREQUENCY = 12.5 50 51 logCategory = 'feedcomponent' 52 53 ### BaseComponent interface implementations
54 - def init(self):
55 # add keys for eaters and feeders uiState 56 self.feeders = {} # feeder feedName -> Feeder 57 self.eaters = {} # eater eaterAlias -> Eater 58 self.uiState.addListKey('feeders') 59 self.uiState.addListKey('eaters') 60 61 self.pipeline = None 62 self.pipeline_signals = [] 63 self.bus_signal_id = None 64 self.effects = {} 65 self._feeder_probe_cl = None 66 67 self._pad_monitors = padmonitor.PadMonitorSet( 68 lambda: self.setMood(moods.happy), 69 lambda: self.setMood(moods.hungry)) 70 71 self._clock_slaved = False 72 self.clock_provider = None 73 self._master_clock_info = None # (ip, port, basetime) if we're the 74 # clock master 75 76 self._change_monitor = gstreamer.StateChangeMonitor() 77 78 # multifdsink's get-stats signal had critical bugs before this version 79 self._get_stats_supported = (gstreamer.get_plugin_version('tcp') 80 >= (0, 10, 11, 0))
81
82 - def do_setup(self):
83 """ 84 Sets up component. 85 86 Invokes the L{create_pipeline} and L{set_pipeline} vmethods, 87 which subclasses can provide. 88 """ 89 config = self.config 90 eater_config = config.get('eater', {}) 91 feeder_config = config.get('feed', []) 92 source_config = config.get('source', []) 93 94 self.debug("FeedComponent.do_setup(): eater_config %r", eater_config) 95 self.debug("FeedComponent.do_setup(): feeder_config %r", feeder_config) 96 self.debug("FeedComponent.do_setup(): source_config %r", source_config) 97 # for upgrade of code without restarting managers 98 # this will only be for components whose eater name in registry is 99 # default, so no need to import registry and find eater name 100 if eater_config == {} and source_config != []: 101 eater_config = {'default': [(x, 'default') for x in source_config]} 102 103 for eaterName in eater_config: 104 for feedId, eaterAlias in eater_config[eaterName]: 105 self.eaters[eaterAlias] = Eater(eaterAlias, eaterName) 106 self.uiState.append('eaters', self.eaters[eaterAlias].uiState) 107 108 for feederName in feeder_config: 109 self.feeders[feederName] = Feeder(feederName) 110 self.uiState.append('feeders', 111 self.feeders[feederName].uiState) 112 113 clockMaster = config.get('clock-master', None) 114 if clockMaster: 115 self._clock_slaved = clockMaster != config['avatarId'] 116 else: 117 self._clock_slaved = False 118 119 pipeline = self.create_pipeline() 120 self.connect_feeders(pipeline) 121 self.set_pipeline(pipeline) 122 123 self.debug("FeedComponent.do_setup(): setup finished") 124 125 self.try_start_pipeline() 126 127 # no race, messages marshalled asynchronously via the bus 128 d = self._change_monitor.add(gst.STATE_CHANGE_PAUSED_TO_PLAYING) 129 d.addCallback(lambda x: self.do_pipeline_playing()) 130 return d
131 132 ### FeedComponent interface for subclasses
133 - def create_pipeline(self):
134 """ 135 Subclasses have to implement this method. 136 137 @rtype: L{gst.Pipeline} 138 """ 139 raise NotImplementedError, "subclass must implement create_pipeline"
140
141 - def set_pipeline(self, pipeline):
142 """ 143 Subclasses can override me. 144 They should chain up first. 145 """ 146 if self.pipeline: 147 self.cleanup() 148 self.pipeline = pipeline 149 self._setup_pipeline()
150
151 - def attachPadMonitorToFeeder(self, feederName):
152 elementName = self.feeders[feederName].payName 153 element = self.pipeline.get_by_name(elementName) 154 if not element: 155 raise errors.ComponentError("No such feeder %s" % feederName) 156 157 pad = element.get_pad('src') 158 self._pad_monitors.attach(pad, elementName)
159 160 ### FeedComponent methods
161 - def addEffect(self, effect):
162 self.effects[effect.name] = effect 163 effect.setComponent(self)
164
165 - def connect_feeders(self, pipeline):
166 # Connect to the client-fd-removed signals on each feeder, so we 167 # can clean up properly on removal. 168 def client_fd_removed(sink, fd, feeder): 169 # Called (as a signal callback) when the FD is no longer in 170 # use by multifdsink. 171 # This will call the registered callable on the fd. 172 # Called from GStreamer threads. 173 self.debug("cleaning up fd %d", fd) 174 feeder.clientDisconnected(fd)
175 176 for feeder in self.feeders.values(): 177 element = pipeline.get_by_name(feeder.elementName) 178 element.connect('client-fd-removed', client_fd_removed, 179 feeder) 180 self.debug("Connected to client-fd-removed on %r", feeder)
181
182 - def get_pipeline(self):
183 return self.pipeline
184
185 - def do_pipeline_playing(self):
186 """ 187 Invoked when the pipeline has changed the state to playing. 188 The default implementation sets the component's mood to HAPPY. 189 """ 190 self.setMood(moods.happy)
191
192 - def make_message_for_gstreamer_error(self, gerror, debug):
193 """Make a flumotion error message to show to the user. 194 195 This method may be overridden by components that have special 196 knowledge about potential errors. If the component does not know 197 about the error, it can chain up to this implementation, which 198 will make a generic message. 199 200 @param gerror: The GError from the error message posted on the 201 GStreamer message bus. 202 @type gerror: L{gst.GError} 203 @param debug: A string with debugging information. 204 @type debug: str 205 206 @returns: A L{flumotion.common.messages.Message} to show to the 207 user. 208 """ 209 # generate a unique id 210 mid = "%s-%s-%d" % (self.name, gerror.domain, gerror.code) 211 m = messages.Error(T_(N_( 212 "Internal GStreamer error.")), 213 debug="%s\n%s: %d\n%s" % ( 214 gerror.message, gerror.domain, gerror.code, debug), 215 id=mid, priority=40) 216 return m
217
218 - def bus_message_received_cb(self, bus, message):
219 def state_changed(): 220 if src == self.pipeline: 221 old, new, pending = message.parse_state_changed() 222 self._change_monitor.state_changed(old, new)
223 224 def error(): 225 gerror, debug = message.parse_error() 226 self.warning('element %s error %s %s', 227 src.get_path_string(), gerror, debug) 228 self.setMood(moods.sad) 229 m = self.make_message_for_gstreamer_error(gerror, debug) 230 self.state.append('messages', m) 231 self._change_monitor.have_error(self.pipeline.get_state(), 232 message) 233 234 def eos(): 235 name = src.get_name() 236 if name in self._pad_monitors: 237 self.info('End of stream in element %s', name) 238 self._pad_monitors[name].setInactive() 239 else: 240 self.info("We got an eos from %s", name) 241 242 def default(): 243 self.log('message received: %r', message) 244 245 handlers = {gst.MESSAGE_STATE_CHANGED: state_changed, 246 gst.MESSAGE_ERROR: error, 247 gst.MESSAGE_EOS: eos} 248 t = message.type 249 src = message.src 250 handlers.get(t, default)() 251 return True 252
253 - def install_eater_continuity_watch(self, eaterWatchElements):
254 """Watch a set of elements for discontinuity messages. 255 256 @param eaterWatchElements: the set of elements to watch for 257 discontinuities. 258 @type eaterWatchElements: Dict of elementName => Eater. 259 """ 260 def on_element_message(bus, message): 261 src = message.src 262 name = src.get_name() 263 if name in eaterWatchElements: 264 eater = eaterWatchElements[name] 265 s = message.structure 266 def timestampDiscont(): 267 prevTs = s["prev-timestamp"] 268 prevDuration = s["prev-duration"] 269 curTs = s["cur-timestamp"] 270 discont = curTs - (prevTs + prevDuration) 271 dSeconds = discont / float(gst.SECOND) 272 self.debug("we have a discont on eater %s of %f s " 273 "between %s and %s ", eater.eaterAlias, 274 dSeconds, gst.TIME_ARGS(prevTs), 275 gst.TIME_ARGS(curTs)) 276 eater.timestampDiscont(dSeconds, 277 float(curTs) / float(gst.SECOND))
278 279 def offsetDiscont(): 280 prevOffsetEnd = s["prev-offset-end"] 281 curOffset = s["cur-offset"] 282 discont = curOffset - prevOffsetEnd 283 self.debug("we have a discont on eater %s of %d " 284 "units between %d and %d ", 285 eater.eaterAlias, discont, prevOffsetEnd, 286 curOffset) 287 eater.offsetDiscont(discont, curOffset) 288 289 handlers = {'imperfect-timestamp': timestampDiscont, 290 'imperfect-offset': offsetDiscont} 291 if s.get_name() in handlers: 292 handlers[s.get_name()]() 293 294 # we know that there is a signal watch already installed 295 bus = self.pipeline.get_bus() 296 # never gets cleaned up; does that matter? 297 bus.connect("message::element", on_element_message) 298
299 - def install_eater_event_probes(self, eater):
300 def fdsrc_event(pad, event): 301 # An event probe used to consume unwanted EOS events on eaters. 302 # Called from GStreamer threads. 303 if event.type == gst.EVENT_EOS: 304 self.info('End of stream for eater %s, disconnect will be ' 305 'triggered', eater.eaterAlias) 306 # We swallow it because otherwise our component acts on the EOS 307 # and we can't recover from that later. Instead, fdsrc will be 308 # taken out and given a new fd on the next eatFromFD call. 309 return False 310 return True
311 312 def depay_event(pad, event): 313 # An event probe used to consume unwanted duplicate 314 # newsegment events. 315 # Called from GStreamer threads. 316 if event.type == gst.EVENT_NEWSEGMENT: 317 # We do this because we know gdppay/gdpdepay screw up on 2nd 318 # newsegments (unclear what the original reason for this 319 # was, perhaps #349204) 320 if getattr(eater, '_gotFirstNewSegment', False): 321 self.info("Subsequent new segment event received on " 322 "depay on eater %s", eater.eaterAlias) 323 # swallow (gulp) 324 return False 325 else: 326 eater._gotFirstNewSegment = True 327 return True 328 329 self.debug('adding event probe for eater %s', eater.eaterAlias) 330 fdsrc = self.get_element(eater.elementName) 331 fdsrc.get_pad("src").add_event_probe(fdsrc_event) 332 if gstreamer.get_plugin_version('gdp') < (0, 10, 10, 1): 333 depay = self.get_element(eater.depayName) 334 depay.get_pad("src").add_event_probe(depay_event) 335
336 - def _setup_pipeline(self):
337 self.debug('setup_pipeline()') 338 assert self.bus_signal_id == None 339 340 self.pipeline.set_name('pipeline-' + self.getName()) 341 bus = self.pipeline.get_bus() 342 bus.add_signal_watch() 343 self.bus_signal_id = bus.connect('message', 344 self.bus_message_received_cb) 345 sig_id = self.pipeline.connect('deep-notify', 346 gstreamer.verbose_deep_notify_cb, self) 347 self.pipeline_signals.append(sig_id) 348 349 # set to ready so that multifdsinks can always receive fds, even 350 # if the pipeline has a delayed start due to clock slaving 351 self.pipeline.set_state(gst.STATE_READY) 352 353 # start checking feeders, if we have a sufficiently recent multifdsink 354 if self._get_stats_supported: 355 self._feeder_probe_cl = reactor.callLater( 356 self.FEEDER_STATS_UPDATE_FREQUENCY, self._feeder_probe_calllater) 357 else: 358 self.warning("Feeder statistics unavailable, your " 359 "gst-plugins-base is too old") 360 m = messages.Warning(T_(N_( 361 "Your gst-plugins-base is too old, so " 362 "feeder statistics will be unavailable.")), 363 id='multifdsink') 364 m.add(T_(N_( 365 "Please upgrade '%s' to version %s."), 'gst-plugins-base', 366 '0.10.11')) 367 self.addMessage(m) 368 369 for eater in self.eaters.values(): 370 self.install_eater_event_probes(eater) 371 pad = self.get_element(eater.elementName).get_pad('src') 372 self._pad_monitors.attach(pad, eater.elementName, 373 padmonitor.EaterPadMonitor, 374 self.reconnectEater, 375 eater.eaterAlias) 376 eater.setPadMonitor(self._pad_monitors[eater.elementName])
377
378 - def stop_pipeline(self):
379 if not self.pipeline: 380 return 381 382 if self.clock_provider: 383 self.clock_provider.set_property('active', False) 384 self.clock_provider = None 385 retval = self.pipeline.set_state(gst.STATE_NULL) 386 if retval != gst.STATE_CHANGE_SUCCESS: 387 self.warning('Setting pipeline to NULL failed')
388
389 - def cleanup(self):
390 self.debug("cleaning up") 391 392 assert self.pipeline != None 393 394 self.stop_pipeline() 395 # Disconnect signals 396 map(self.pipeline.disconnect, self.pipeline_signals) 397 self.pipeline_signals = [] 398 if self.bus_signal_id: 399 self.pipeline.get_bus().disconnect(self.bus_signal_id) 400 self.pipeline.get_bus().remove_signal_watch() 401 self.bus_signal_id = None 402 self.pipeline = None 403 404 if self._feeder_probe_cl: 405 self._feeder_probe_cl.cancel() 406 self._feeder_probe_cl = None 407 408 # clean up checkEater callLaters 409 for eater in self.eaters.values(): 410 self._pad_monitors.remove(eater.elementName) 411 eater.setPadMonitor(None)
412
413 - def do_stop(self):
414 self.debug('Stopping') 415 if self.pipeline: 416 self.cleanup() 417 self.debug('Stopped') 418 return defer.succeed(None)
419
420 - def set_master_clock(self, ip, port, base_time):
421 self.debug("Master clock set to %s:%d with base_time %s", ip, port, 422 gst.TIME_ARGS(base_time)) 423 424 assert self._clock_slaved 425 if self._master_clock_info == (ip, port, base_time): 426 self.debug("Same master clock info, returning directly") 427 return defer.succeed(None) 428 elif self._master_clock_info: 429 self.stop_pipeline() 430 431 self._master_clock_info = ip, port, base_time 432 433 clock = gst.NetClientClock(None, ip, port, base_time) 434 # disable the pipeline's management of base_time -- we're going 435 # to set it ourselves. 436 self.pipeline.set_new_stream_time(gst.CLOCK_TIME_NONE) 437 self.pipeline.set_base_time(base_time) 438 self.pipeline.use_clock(clock) 439 440 self.try_start_pipeline()
441
442 - def get_master_clock(self):
443 """ 444 Return the connection details for the network clock provided by 445 this component, if any. 446 """ 447 if self.clock_provider: 448 ip, port, base_time = self._master_clock_info 449 return ip, port, base_time 450 else: 451 return None
452
453 - def provide_master_clock(self, port):
454 """ 455 Tell the component to provide a master clock on the given port. 456 457 @returns: a deferred firing a (ip, port, base_time) triple. 458 """ 459 def pipelinePaused(r): 460 clock = self.pipeline.get_clock() 461 # make sure the pipeline sticks with this clock 462 self.pipeline.use_clock(clock) 463 464 self.clock_provider = gst.NetTimeProvider(clock, None, port) 465 realport = self.clock_provider.get_property('port') 466 467 base_time = self.pipeline.get_base_time() 468 469 self.debug('provided master clock from %r, base time %s', 470 clock, gst.TIME_ARGS(base_time)) 471 472 if self.medium: 473 # FIXME: This isn't always correct. We need a more flexible API, 474 # and a proper network map, to do this. Even then, it's not 475 # always going to be possible. 476 ip = self.medium.getIP() 477 else: 478 ip = "127.0.0.1" 479 480 self._master_clock_info = (ip, realport, base_time) 481 return self.get_master_clock()
482 483 assert self.pipeline 484 assert not self._clock_slaved 485 (ret, state, pending) = self.pipeline.get_state(0) 486 if state != gst.STATE_PAUSED and state != gst.STATE_PLAYING: 487 self.debug("pipeline still spinning up: %r", state) 488 d = self._change_monitor.add(gst.STATE_CHANGE_READY_TO_PAUSED) 489 d.addCallback(pipelinePaused) 490 return d 491 elif self.clock_provider: 492 self.debug("returning existing master clock info") 493 return defer.succeed(self.get_master_clock()) 494 else: 495 return defer.maybeDeferred(pipelinePaused, None) 496 497 ### BaseComponent interface implementation
498 - def try_start_pipeline(self):
499 """ 500 Tell the component to start. 501 Whatever is using the component is responsible for making sure all 502 eaters have received their file descriptor to eat from. 503 """ 504 (ret, state, pending) = self.pipeline.get_state(0) 505 if state == gst.STATE_PLAYING: 506 self.log('already PLAYING') 507 return 508 509 if self._clock_slaved and not self._master_clock_info: 510 self.debug("Missing master clock info, deferring set to PLAYING") 511 return 512 513 for eater in self.eaters.values(): 514 if not eater.fd: 515 self.debug('eater %s not yet connected, deferring set to ' 516 'PLAYING', eater.eaterAlias) 517 return 518 519 self.debug("Setting pipeline %r to GST_STATE_PLAYING", self.pipeline) 520 self.pipeline.set_state(gst.STATE_PLAYING)
521
522 - def _feeder_probe_calllater(self):
523 for feedId, feeder in self.feeders.items(): 524 feederElement = self.get_element(feeder.elementName) 525 for client in feeder.getClients(): 526 # a currently disconnected client will have fd None 527 if client.fd is not None: 528 array = feederElement.emit('get-stats', client.fd) 529 if len(array) == 0: 530 # There is an unavoidable race here: we can't know 531 # whether the fd has been removed from multifdsink. 532 # However, if we call get-stats on an fd that 533 # multifdsink doesn't know about, we just get a 0-length 534 # array. We ensure that we don't reuse the FD too soon 535 # so this can't result in calling this on a valid but 536 # WRONG fd 537 self.debug('Feeder element for feed %s does not know ' 538 'client fd %d' % (feedId, client.fd)) 539 else: 540 client.setStats(array) 541 self._feeder_probe_cl = reactor.callLater(self.FEEDER_STATS_UPDATE_FREQUENCY, 542 self._feeder_probe_calllater)
543
544 - def unblock_eater(self, eaterAlias):
545 """ 546 After this function returns, the stream lock for this eater must have 547 been released. If your component needs to do something here, override 548 this method. 549 """ 550 pass
551
552 - def get_element(self, element_name):
553 """Get an element out of the pipeline. 554 555 If it is possible that the component has not yet been set up, 556 the caller needs to check if self.pipeline is actually set. 557 """ 558 assert self.pipeline 559 self.log('Looking up element %r in pipeline %r', 560 element_name, self.pipeline) 561 element = self.pipeline.get_by_name(element_name) 562 if not element: 563 self.warning("No element named %r in pipeline", element_name) 564 return element
565
566 - def get_element_property(self, element_name, property):
567 'Gets a property of an element in the GStreamer pipeline.' 568 self.debug("%s: getting property %s of element %s" % (self.getName(), property, element_name)) 569 element = self.get_element(element_name) 570 if not element: 571 msg = "Element '%s' does not exist" % element_name 572 self.warning(msg) 573 raise errors.PropertyError(msg) 574 575 self.debug('getting property %s on element %s' % (property, element_name)) 576 try: 577 value = element.get_property(property) 578 except (ValueError, TypeError): 579 msg = "Property '%s' on element '%s' does not exist" % (property, element_name) 580 self.warning(msg) 581 raise errors.PropertyError(msg) 582 583 # param enums and enums need to be returned by integer value 584 if isinstance(value, gobject.GEnum): 585 value = int(value) 586 587 return value
588
589 - def set_element_property(self, element_name, property, value):
590 'Sets a property on an element in the GStreamer pipeline.' 591 self.debug("%s: setting property %s of element %s to %s" % ( 592 self.getName(), property, element_name, value)) 593 element = self.get_element(element_name) 594 if not element: 595 msg = "Element '%s' does not exist" % element_name 596 self.warning(msg) 597 raise errors.PropertyError(msg) 598 599 self.debug('setting property %s on element %r to %s' % 600 (property, element_name, value)) 601 pygobject.gobject_set_property(element, property, value)
602 603 ### methods to connect component eaters and feeders
604 - def reconnectEater(self, eaterAlias):
605 if not self.medium: 606 self.debug("Can't reconnect eater %s, running " 607 "without a medium", eaterAlias) 608 return 609 610 self.eaters[eaterAlias].disconnected() 611 self.medium.connectEater(eaterAlias)
612
613 - def feedToFD(self, feedName, fd, cleanup, eaterId=None):
614 """ 615 @param feedName: name of the feed to feed to the given fd. 616 @type feedName: str 617 @param fd: the file descriptor to feed to 618 @type fd: int 619 @param cleanup: the function to call when the FD is no longer feeding 620 @type cleanup: callable 621 """ 622 self.debug('FeedToFD(%s, %d)', feedName, fd) 623 624 # We must have a pipeline in READY or above to do this. Do a 625 # non-blocking (zero timeout) get_state. 626 if not self.pipeline or self.pipeline.get_state(0)[1] == gst.STATE_NULL: 627 self.warning('told to feed %s to fd %d, but pipeline not ' 628 'running yet', feedName, fd) 629 cleanup(fd) 630 # can happen if we are restarting but the other component is 631 # happy; assume other side will reconnect later 632 return 633 634 if feedName not in self.feeders: 635 msg = "Cannot find feeder named '%s'" % feedName 636 mid = "feedToFD-%s" % feedName 637 m = messages.Warning(T_(N_("Internal Flumotion error.")), 638 debug=msg, id=mid, priority=40) 639 self.state.append('messages', m) 640 self.warning(msg) 641 cleanup(fd) 642 return False 643 644 feeder = self.feeders[feedName] 645 element = self.get_element(feeder.elementName) 646 assert element 647 clientId = eaterId or ('client-%d' % fd) 648 element.emit('add', fd) 649 feeder.clientConnected(clientId, fd, cleanup)
650
651 - def eatFromFD(self, eaterAlias, feedId, fd):
652 """ 653 Tell the component to eat the given feedId from the given fd. 654 The component takes over the ownership of the fd, closing it when 655 no longer eating. 656 657 @param eaterAlias: the alias of the eater 658 @type eaterAlias: str 659 @param feedId: feed id (componentName:feedName) to eat from through 660 the given fd 661 @type feedId: str 662 @param fd: the file descriptor to eat from 663 @type fd: int 664 """ 665 self.debug('EatFromFD(%s, %s, %d)', eaterAlias, feedId, fd) 666 667 if not self.pipeline: 668 self.warning('told to eat %s from fd %d, but pipeline not ' 669 'running yet', feedId, fd) 670 # can happen if we are restarting but the other component is 671 # happy; assume other side will reconnect later 672 os.close(fd) 673 return 674 675 if eaterAlias not in self.eaters: 676 self.warning('Unknown eater alias: %s', eaterAlias) 677 os.close(fd) 678 return 679 680 eater = self.eaters[eaterAlias] 681 element = self.get_element(eater.elementName) 682 if not element: 683 self.warning('Eater element %s not found', eater.elementName) 684 os.close(fd) 685 return 686 687 # fdsrc only switches to the new fd in ready or below 688 (result, current, pending) = element.get_state(0L) 689 pipeline_playing = current not in [gst.STATE_NULL, gst.STATE_READY] 690 if pipeline_playing: 691 self.debug('eater %s in state %r, kidnapping it', 692 eaterAlias, current) 693 694 # we unlink fdsrc from its peer, take it out of the pipeline 695 # so we can set it to READY without having it send EOS, 696 # then switch fd and put it back in. 697 # To do this safely, we first block fdsrc:src, then let the 698 # component do any neccesary unlocking (needed for multi-input 699 # elements) 700 srcpad = element.get_pad('src') 701 702 def _block_cb(pad, blocked): 703 pass
704 srcpad.set_blocked_async(True, _block_cb) 705 self.unblock_eater(eaterAlias) 706 707 # Now, we can switch FD with this mess 708 sinkpad = srcpad.get_peer() 709 srcpad.unlink(sinkpad) 710 parent = element.get_parent() 711 parent.remove(element) 712 self.log("setting to ready") 713 element.set_state(gst.STATE_READY) 714 self.log("setting to ready complete!!!") 715 old = element.get_property('fd') 716 self.log("Closing old fd %d", old) 717 os.close(old) 718 element.set_property('fd', fd) 719 parent.add(element) 720 srcpad.link(sinkpad) 721 element.set_state(gst.STATE_PLAYING) 722 # We're done; unblock the pad 723 srcpad.set_blocked_async(False, _block_cb) 724 else: 725 element.set_property('fd', fd) 726 727 # update our eater uiState, saying that we are eating from a 728 # possibly new feedId 729 eater.connected(fd, feedId) 730 731 if not pipeline_playing: 732 self.try_start_pipeline() 733