1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import gst
23 import gobject
24
25 import os
26 import time
27
28 from twisted.internet import reactor, defer
29
30 from flumotion.component import component as basecomponent
31 from flumotion.common import common, errors, pygobject, messages
32 from flumotion.common import gstreamer
33 from flumotion.component import feed, padmonitor
34 from flumotion.component.feeder import Feeder
35 from flumotion.component.eater import Eater
36
37 from flumotion.common.planet import moods
38
39 from flumotion.common.messages import N_
40 T_ = messages.gettexter('flumotion')
41
42
44 """
45 I am a base class for all Flumotion feed components.
46 """
47
48
49 FEEDER_STATS_UPDATE_FREQUENCY = 12.5
50
51 logCategory = 'feedcomponent'
52
53
55
56 self.feeders = {}
57 self.eaters = {}
58 self.uiState.addListKey('feeders')
59 self.uiState.addListKey('eaters')
60
61 self.pipeline = None
62 self.pipeline_signals = []
63 self.bus_signal_id = None
64 self.effects = {}
65 self._feeder_probe_cl = None
66
67 self._pad_monitors = padmonitor.PadMonitorSet(
68 lambda: self.setMood(moods.happy),
69 lambda: self.setMood(moods.hungry))
70
71 self._clock_slaved = False
72 self.clock_provider = None
73 self._master_clock_info = None
74
75
76 self._change_monitor = gstreamer.StateChangeMonitor()
77
78
79 self._get_stats_supported = (gstreamer.get_plugin_version('tcp')
80 >= (0, 10, 11, 0))
81
83 """
84 Sets up component.
85
86 Invokes the L{create_pipeline} and L{set_pipeline} vmethods,
87 which subclasses can provide.
88 """
89 config = self.config
90 eater_config = config.get('eater', {})
91 feeder_config = config.get('feed', [])
92 source_config = config.get('source', [])
93
94 self.debug("FeedComponent.do_setup(): eater_config %r", eater_config)
95 self.debug("FeedComponent.do_setup(): feeder_config %r", feeder_config)
96 self.debug("FeedComponent.do_setup(): source_config %r", source_config)
97
98
99
100 if eater_config == {} and source_config != []:
101 eater_config = {'default': [(x, 'default') for x in source_config]}
102
103 for eaterName in eater_config:
104 for feedId, eaterAlias in eater_config[eaterName]:
105 self.eaters[eaterAlias] = Eater(eaterAlias, eaterName)
106 self.uiState.append('eaters', self.eaters[eaterAlias].uiState)
107
108 for feederName in feeder_config:
109 self.feeders[feederName] = Feeder(feederName)
110 self.uiState.append('feeders',
111 self.feeders[feederName].uiState)
112
113 clockMaster = config.get('clock-master', None)
114 if clockMaster:
115 self._clock_slaved = clockMaster != config['avatarId']
116 else:
117 self._clock_slaved = False
118
119 pipeline = self.create_pipeline()
120 self.connect_feeders(pipeline)
121 self.set_pipeline(pipeline)
122
123 self.debug("FeedComponent.do_setup(): setup finished")
124
125 self.try_start_pipeline()
126
127
128 d = self._change_monitor.add(gst.STATE_CHANGE_PAUSED_TO_PLAYING)
129 d.addCallback(lambda x: self.do_pipeline_playing())
130 return d
131
132
134 """
135 Subclasses have to implement this method.
136
137 @rtype: L{gst.Pipeline}
138 """
139 raise NotImplementedError, "subclass must implement create_pipeline"
140
150
152 elementName = self.feeders[feederName].payName
153 element = self.pipeline.get_by_name(elementName)
154 if not element:
155 raise errors.ComponentError("No such feeder %s" % feederName)
156
157 pad = element.get_pad('src')
158 self._pad_monitors.attach(pad, elementName)
159
160
164
175
176 for feeder in self.feeders.values():
177 element = pipeline.get_by_name(feeder.elementName)
178 element.connect('client-fd-removed', client_fd_removed,
179 feeder)
180 self.debug("Connected to client-fd-removed on %r", feeder)
181
184
186 """
187 Invoked when the pipeline has changed the state to playing.
188 The default implementation sets the component's mood to HAPPY.
189 """
190 self.setMood(moods.happy)
191
193 """Make a flumotion error message to show to the user.
194
195 This method may be overridden by components that have special
196 knowledge about potential errors. If the component does not know
197 about the error, it can chain up to this implementation, which
198 will make a generic message.
199
200 @param gerror: The GError from the error message posted on the
201 GStreamer message bus.
202 @type gerror: L{gst.GError}
203 @param debug: A string with debugging information.
204 @type debug: str
205
206 @returns: A L{flumotion.common.messages.Message} to show to the
207 user.
208 """
209
210 mid = "%s-%s-%d" % (self.name, gerror.domain, gerror.code)
211 m = messages.Error(T_(N_(
212 "Internal GStreamer error.")),
213 debug="%s\n%s: %d\n%s" % (
214 gerror.message, gerror.domain, gerror.code, debug),
215 id=mid, priority=40)
216 return m
217
223
224 def error():
225 gerror, debug = message.parse_error()
226 self.warning('element %s error %s %s',
227 src.get_path_string(), gerror, debug)
228 self.setMood(moods.sad)
229 m = self.make_message_for_gstreamer_error(gerror, debug)
230 self.state.append('messages', m)
231 self._change_monitor.have_error(self.pipeline.get_state(),
232 message)
233
234 def eos():
235 name = src.get_name()
236 if name in self._pad_monitors:
237 self.info('End of stream in element %s', name)
238 self._pad_monitors[name].setInactive()
239 else:
240 self.info("We got an eos from %s", name)
241
242 def default():
243 self.log('message received: %r', message)
244
245 handlers = {gst.MESSAGE_STATE_CHANGED: state_changed,
246 gst.MESSAGE_ERROR: error,
247 gst.MESSAGE_EOS: eos}
248 t = message.type
249 src = message.src
250 handlers.get(t, default)()
251 return True
252
254 """Watch a set of elements for discontinuity messages.
255
256 @param eaterWatchElements: the set of elements to watch for
257 discontinuities.
258 @type eaterWatchElements: Dict of elementName => Eater.
259 """
260 def on_element_message(bus, message):
261 src = message.src
262 name = src.get_name()
263 if name in eaterWatchElements:
264 eater = eaterWatchElements[name]
265 s = message.structure
266 def timestampDiscont():
267 prevTs = s["prev-timestamp"]
268 prevDuration = s["prev-duration"]
269 curTs = s["cur-timestamp"]
270 discont = curTs - (prevTs + prevDuration)
271 dSeconds = discont / float(gst.SECOND)
272 self.debug("we have a discont on eater %s of %f s "
273 "between %s and %s ", eater.eaterAlias,
274 dSeconds, gst.TIME_ARGS(prevTs),
275 gst.TIME_ARGS(curTs))
276 eater.timestampDiscont(dSeconds,
277 float(curTs) / float(gst.SECOND))
278
279 def offsetDiscont():
280 prevOffsetEnd = s["prev-offset-end"]
281 curOffset = s["cur-offset"]
282 discont = curOffset - prevOffsetEnd
283 self.debug("we have a discont on eater %s of %d "
284 "units between %d and %d ",
285 eater.eaterAlias, discont, prevOffsetEnd,
286 curOffset)
287 eater.offsetDiscont(discont, curOffset)
288
289 handlers = {'imperfect-timestamp': timestampDiscont,
290 'imperfect-offset': offsetDiscont}
291 if s.get_name() in handlers:
292 handlers[s.get_name()]()
293
294
295 bus = self.pipeline.get_bus()
296
297 bus.connect("message::element", on_element_message)
298
300 def fdsrc_event(pad, event):
301
302
303 if event.type == gst.EVENT_EOS:
304 self.info('End of stream for eater %s, disconnect will be '
305 'triggered', eater.eaterAlias)
306
307
308
309 return False
310 return True
311
312 def depay_event(pad, event):
313
314
315
316 if event.type == gst.EVENT_NEWSEGMENT:
317
318
319
320 if getattr(eater, '_gotFirstNewSegment', False):
321 self.info("Subsequent new segment event received on "
322 "depay on eater %s", eater.eaterAlias)
323
324 return False
325 else:
326 eater._gotFirstNewSegment = True
327 return True
328
329 self.debug('adding event probe for eater %s', eater.eaterAlias)
330 fdsrc = self.get_element(eater.elementName)
331 fdsrc.get_pad("src").add_event_probe(fdsrc_event)
332 if gstreamer.get_plugin_version('gdp') < (0, 10, 10, 1):
333 depay = self.get_element(eater.depayName)
334 depay.get_pad("src").add_event_probe(depay_event)
335
337 self.debug('setup_pipeline()')
338 assert self.bus_signal_id == None
339
340 self.pipeline.set_name('pipeline-' + self.getName())
341 bus = self.pipeline.get_bus()
342 bus.add_signal_watch()
343 self.bus_signal_id = bus.connect('message',
344 self.bus_message_received_cb)
345 sig_id = self.pipeline.connect('deep-notify',
346 gstreamer.verbose_deep_notify_cb, self)
347 self.pipeline_signals.append(sig_id)
348
349
350
351 self.pipeline.set_state(gst.STATE_READY)
352
353
354 if self._get_stats_supported:
355 self._feeder_probe_cl = reactor.callLater(
356 self.FEEDER_STATS_UPDATE_FREQUENCY, self._feeder_probe_calllater)
357 else:
358 self.warning("Feeder statistics unavailable, your "
359 "gst-plugins-base is too old")
360 m = messages.Warning(T_(N_(
361 "Your gst-plugins-base is too old, so "
362 "feeder statistics will be unavailable.")),
363 id='multifdsink')
364 m.add(T_(N_(
365 "Please upgrade '%s' to version %s."), 'gst-plugins-base',
366 '0.10.11'))
367 self.addMessage(m)
368
369 for eater in self.eaters.values():
370 self.install_eater_event_probes(eater)
371 pad = self.get_element(eater.elementName).get_pad('src')
372 self._pad_monitors.attach(pad, eater.elementName,
373 padmonitor.EaterPadMonitor,
374 self.reconnectEater,
375 eater.eaterAlias)
376 eater.setPadMonitor(self._pad_monitors[eater.elementName])
377
379 if not self.pipeline:
380 return
381
382 if self.clock_provider:
383 self.clock_provider.set_property('active', False)
384 self.clock_provider = None
385 retval = self.pipeline.set_state(gst.STATE_NULL)
386 if retval != gst.STATE_CHANGE_SUCCESS:
387 self.warning('Setting pipeline to NULL failed')
388
390 self.debug("cleaning up")
391
392 assert self.pipeline != None
393
394 self.stop_pipeline()
395
396 map(self.pipeline.disconnect, self.pipeline_signals)
397 self.pipeline_signals = []
398 if self.bus_signal_id:
399 self.pipeline.get_bus().disconnect(self.bus_signal_id)
400 self.pipeline.get_bus().remove_signal_watch()
401 self.bus_signal_id = None
402 self.pipeline = None
403
404 if self._feeder_probe_cl:
405 self._feeder_probe_cl.cancel()
406 self._feeder_probe_cl = None
407
408
409 for eater in self.eaters.values():
410 self._pad_monitors.remove(eater.elementName)
411 eater.setPadMonitor(None)
412
419
421 self.debug("Master clock set to %s:%d with base_time %s", ip, port,
422 gst.TIME_ARGS(base_time))
423
424 assert self._clock_slaved
425 if self._master_clock_info == (ip, port, base_time):
426 self.debug("Same master clock info, returning directly")
427 return defer.succeed(None)
428 elif self._master_clock_info:
429 self.stop_pipeline()
430
431 self._master_clock_info = ip, port, base_time
432
433 clock = gst.NetClientClock(None, ip, port, base_time)
434
435
436 self.pipeline.set_new_stream_time(gst.CLOCK_TIME_NONE)
437 self.pipeline.set_base_time(base_time)
438 self.pipeline.use_clock(clock)
439
440 self.try_start_pipeline()
441
443 """
444 Return the connection details for the network clock provided by
445 this component, if any.
446 """
447 if self.clock_provider:
448 ip, port, base_time = self._master_clock_info
449 return ip, port, base_time
450 else:
451 return None
452
454 """
455 Tell the component to provide a master clock on the given port.
456
457 @returns: a deferred firing a (ip, port, base_time) triple.
458 """
459 def pipelinePaused(r):
460 clock = self.pipeline.get_clock()
461
462 self.pipeline.use_clock(clock)
463
464 self.clock_provider = gst.NetTimeProvider(clock, None, port)
465 realport = self.clock_provider.get_property('port')
466
467 base_time = self.pipeline.get_base_time()
468
469 self.debug('provided master clock from %r, base time %s',
470 clock, gst.TIME_ARGS(base_time))
471
472 if self.medium:
473
474
475
476 ip = self.medium.getIP()
477 else:
478 ip = "127.0.0.1"
479
480 self._master_clock_info = (ip, realport, base_time)
481 return self.get_master_clock()
482
483 assert self.pipeline
484 assert not self._clock_slaved
485 (ret, state, pending) = self.pipeline.get_state(0)
486 if state != gst.STATE_PAUSED and state != gst.STATE_PLAYING:
487 self.debug("pipeline still spinning up: %r", state)
488 d = self._change_monitor.add(gst.STATE_CHANGE_READY_TO_PAUSED)
489 d.addCallback(pipelinePaused)
490 return d
491 elif self.clock_provider:
492 self.debug("returning existing master clock info")
493 return defer.succeed(self.get_master_clock())
494 else:
495 return defer.maybeDeferred(pipelinePaused, None)
496
497
499 """
500 Tell the component to start.
501 Whatever is using the component is responsible for making sure all
502 eaters have received their file descriptor to eat from.
503 """
504 (ret, state, pending) = self.pipeline.get_state(0)
505 if state == gst.STATE_PLAYING:
506 self.log('already PLAYING')
507 return
508
509 if self._clock_slaved and not self._master_clock_info:
510 self.debug("Missing master clock info, deferring set to PLAYING")
511 return
512
513 for eater in self.eaters.values():
514 if not eater.fd:
515 self.debug('eater %s not yet connected, deferring set to '
516 'PLAYING', eater.eaterAlias)
517 return
518
519 self.debug("Setting pipeline %r to GST_STATE_PLAYING", self.pipeline)
520 self.pipeline.set_state(gst.STATE_PLAYING)
521
543
545 """
546 After this function returns, the stream lock for this eater must have
547 been released. If your component needs to do something here, override
548 this method.
549 """
550 pass
551
553 """Get an element out of the pipeline.
554
555 If it is possible that the component has not yet been set up,
556 the caller needs to check if self.pipeline is actually set.
557 """
558 assert self.pipeline
559 self.log('Looking up element %r in pipeline %r',
560 element_name, self.pipeline)
561 element = self.pipeline.get_by_name(element_name)
562 if not element:
563 self.warning("No element named %r in pipeline", element_name)
564 return element
565
567 'Gets a property of an element in the GStreamer pipeline.'
568 self.debug("%s: getting property %s of element %s" % (self.getName(), property, element_name))
569 element = self.get_element(element_name)
570 if not element:
571 msg = "Element '%s' does not exist" % element_name
572 self.warning(msg)
573 raise errors.PropertyError(msg)
574
575 self.debug('getting property %s on element %s' % (property, element_name))
576 try:
577 value = element.get_property(property)
578 except (ValueError, TypeError):
579 msg = "Property '%s' on element '%s' does not exist" % (property, element_name)
580 self.warning(msg)
581 raise errors.PropertyError(msg)
582
583
584 if isinstance(value, gobject.GEnum):
585 value = int(value)
586
587 return value
588
590 'Sets a property on an element in the GStreamer pipeline.'
591 self.debug("%s: setting property %s of element %s to %s" % (
592 self.getName(), property, element_name, value))
593 element = self.get_element(element_name)
594 if not element:
595 msg = "Element '%s' does not exist" % element_name
596 self.warning(msg)
597 raise errors.PropertyError(msg)
598
599 self.debug('setting property %s on element %r to %s' %
600 (property, element_name, value))
601 pygobject.gobject_set_property(element, property, value)
602
603
605 if not self.medium:
606 self.debug("Can't reconnect eater %s, running "
607 "without a medium", eaterAlias)
608 return
609
610 self.eaters[eaterAlias].disconnected()
611 self.medium.connectEater(eaterAlias)
612
613 - def feedToFD(self, feedName, fd, cleanup, eaterId=None):
614 """
615 @param feedName: name of the feed to feed to the given fd.
616 @type feedName: str
617 @param fd: the file descriptor to feed to
618 @type fd: int
619 @param cleanup: the function to call when the FD is no longer feeding
620 @type cleanup: callable
621 """
622 self.debug('FeedToFD(%s, %d)', feedName, fd)
623
624
625
626 if not self.pipeline or self.pipeline.get_state(0)[1] == gst.STATE_NULL:
627 self.warning('told to feed %s to fd %d, but pipeline not '
628 'running yet', feedName, fd)
629 cleanup(fd)
630
631
632 return
633
634 if feedName not in self.feeders:
635 msg = "Cannot find feeder named '%s'" % feedName
636 mid = "feedToFD-%s" % feedName
637 m = messages.Warning(T_(N_("Internal Flumotion error.")),
638 debug=msg, id=mid, priority=40)
639 self.state.append('messages', m)
640 self.warning(msg)
641 cleanup(fd)
642 return False
643
644 feeder = self.feeders[feedName]
645 element = self.get_element(feeder.elementName)
646 assert element
647 clientId = eaterId or ('client-%d' % fd)
648 element.emit('add', fd)
649 feeder.clientConnected(clientId, fd, cleanup)
650
651 - def eatFromFD(self, eaterAlias, feedId, fd):
652 """
653 Tell the component to eat the given feedId from the given fd.
654 The component takes over the ownership of the fd, closing it when
655 no longer eating.
656
657 @param eaterAlias: the alias of the eater
658 @type eaterAlias: str
659 @param feedId: feed id (componentName:feedName) to eat from through
660 the given fd
661 @type feedId: str
662 @param fd: the file descriptor to eat from
663 @type fd: int
664 """
665 self.debug('EatFromFD(%s, %s, %d)', eaterAlias, feedId, fd)
666
667 if not self.pipeline:
668 self.warning('told to eat %s from fd %d, but pipeline not '
669 'running yet', feedId, fd)
670
671
672 os.close(fd)
673 return
674
675 if eaterAlias not in self.eaters:
676 self.warning('Unknown eater alias: %s', eaterAlias)
677 os.close(fd)
678 return
679
680 eater = self.eaters[eaterAlias]
681 element = self.get_element(eater.elementName)
682 if not element:
683 self.warning('Eater element %s not found', eater.elementName)
684 os.close(fd)
685 return
686
687
688 (result, current, pending) = element.get_state(0L)
689 pipeline_playing = current not in [gst.STATE_NULL, gst.STATE_READY]
690 if pipeline_playing:
691 self.debug('eater %s in state %r, kidnapping it',
692 eaterAlias, current)
693
694
695
696
697
698
699
700 srcpad = element.get_pad('src')
701
702 def _block_cb(pad, blocked):
703 pass
704 srcpad.set_blocked_async(True, _block_cb)
705 self.unblock_eater(eaterAlias)
706
707
708 sinkpad = srcpad.get_peer()
709 srcpad.unlink(sinkpad)
710 parent = element.get_parent()
711 parent.remove(element)
712 self.log("setting to ready")
713 element.set_state(gst.STATE_READY)
714 self.log("setting to ready complete!!!")
715 old = element.get_property('fd')
716 self.log("Closing old fd %d", old)
717 os.close(old)
718 element.set_property('fd', fd)
719 parent.add(element)
720 srcpad.link(sinkpad)
721 element.set_state(gst.STATE_PLAYING)
722
723 srcpad.set_blocked_async(False, _block_cb)
724 else:
725 element.set_property('fd', fd)
726
727
728
729 eater.connected(fd, feedId)
730
731 if not pipeline_playing:
732 self.try_start_pipeline()
733