1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import gst
23 import gobject
24
25 import os
26 import time
27
28 from twisted.internet import reactor, defer
29
30 from flumotion.common import common, errors, pygobject, messages, log
31 from flumotion.common import gstreamer
32 from flumotion.common.i18n import N_, gettexter
33 from flumotion.common.planet import moods
34 from flumotion.component import component as basecomponent
35 from flumotion.component import feed, padmonitor
36 from flumotion.component.feeder import Feeder
37 from flumotion.component.eater import Eater
38
39 __version__ = "$Rev$"
40 T_ = gettexter()
41
42
44 """
45 I am a base class for all Flumotion feed components.
46 """
47
48
49 FEEDER_STATS_UPDATE_FREQUENCY = 12.5
50 keepStreamheaderForLater = False
51 dropStreamHeaders = True
52 swallowNewSegment = True
53
54 logCategory = 'feedcomponent'
55
56
57
59
60 self.feeders = {}
61 self.eaters = {}
62 self.uiState.addListKey('feeders')
63 self.uiState.addListKey('eaters')
64 self.uiState.addKey('gst-debug')
65
66 self.pipeline = None
67 self.pipeline_signals = []
68 self.bus_signal_id = None
69 self.effects = {}
70 self._feeder_probe_cl = None
71
72 self._pad_monitors = padmonitor.PadMonitorSet(
73 lambda: self.setMood(moods.happy),
74 lambda: self.setMood(moods.hungry))
75
76 self._clock_slaved = False
77 self.clock_provider = None
78 self._master_clock_info = None
79
80
81 self._change_monitor = gstreamer.StateChangeMonitor()
82
83
84 self._get_stats_supported = (gstreamer.get_plugin_version('tcp')
85 >= (0, 10, 11, 0))
86
88 """
89 Sets up component.
90
91 Invokes the L{create_pipeline} and L{set_pipeline} vmethods,
92 which subclasses can provide.
93 """
94 config = self.config
95 eater_config = config.get('eater', {})
96 feeder_config = config.get('feed', [])
97 source_config = config.get('source', [])
98
99 self.debug("FeedComponent.do_setup(): eater_config %r", eater_config)
100 self.debug("FeedComponent.do_setup(): feeder_config %r", feeder_config)
101 self.debug("FeedComponent.do_setup(): source_config %r", source_config)
102
103
104
105 if eater_config == {} and source_config != []:
106 eater_config = {'default': [(x, 'default') for x in source_config]}
107
108 for eaterName in eater_config:
109 for feedId, eaterAlias in eater_config[eaterName]:
110 self.eaters[eaterAlias] = Eater(eaterAlias, eaterName)
111 self.uiState.append('eaters', self.eaters[eaterAlias].uiState)
112
113 for feederName in feeder_config:
114 self.feeders[feederName] = Feeder(feederName)
115 self.uiState.append('feeders',
116 self.feeders[feederName].uiState)
117
118 clockMaster = config.get('clock-master', None)
119 if clockMaster:
120 self._clock_slaved = clockMaster != config['avatarId']
121 else:
122 self._clock_slaved = False
123
124 pipeline = self.create_pipeline()
125 self.connect_feeders(pipeline)
126 self.set_pipeline(pipeline)
127
128 self.uiState.set('gst-debug', os.environ.get('GST_DEBUG', '*:0'))
129 self.debug("FeedComponent.do_setup(): setup finished")
130
131 self.try_start_pipeline()
132
133
134 d = self._change_monitor.add(gst.STATE_CHANGE_PAUSED_TO_PLAYING)
135 d.addCallback(lambda x: self.do_pipeline_playing())
136
138
139
140 self.debug("Setup completed")
141
142
143
145 """
146 Subclasses have to implement this method.
147
148 @rtype: L{gst.Pipeline}
149 """
150 raise NotImplementedError(
151 "subclass must implement create_pipeline")
152
162
164 elementName = self.feeders[feederName].payName
165 element = self.pipeline.get_by_name(elementName)
166 if not element:
167 raise errors.ComponentError("No such feeder %s" % feederName)
168
169 pad = element.get_pad('src')
170 self._pad_monitors.attach(pad, elementName)
171
172
173
177
179
180
181
182 def client_fd_removed(sink, fd, feeder):
183
184
185
186
187 self.debug("cleaning up fd %d", fd)
188 feeder.clientDisconnected(fd)
189
190 for feeder in self.feeders.values():
191 element = pipeline.get_by_name(feeder.elementName)
192 if element:
193 element.connect('client-fd-removed', client_fd_removed,
194 feeder)
195 self.debug("Connected to client-fd-removed on %r", feeder)
196 else:
197 self.warning("No feeder %s in pipeline", feeder.elementName)
198
201
203 """
204 Invoked when the pipeline has changed the state to playing.
205 The default implementation sets the component's mood to HAPPY.
206 """
207 self.setMood(moods.happy)
208
210 """Make a flumotion error message to show to the user.
211
212 This method may be overridden by components that have special
213 knowledge about potential errors. If the component does not know
214 about the error, it can chain up to this implementation, which
215 will make a generic message.
216
217 @param gerror: The GError from the error message posted on the
218 GStreamer message bus.
219 @type gerror: L{gst.GError}
220 @param debug: A string with debugging information.
221 @type debug: str
222
223 @returns: A L{flumotion.common.messages.Message} to show to the
224 user.
225 """
226
227 mid = "%s-%s-%d" % (self.name, gerror.domain, gerror.code)
228 m = messages.Error(T_(N_(
229 "Internal GStreamer error.")),
230 debug="%s\n%s: %d\n%s" % (
231 gerror.message, gerror.domain, gerror.code, debug),
232 mid=mid, priority=40)
233 return m
234
245
246 def error():
247 gerror, debug = message.parse_error()
248 self.warning('element %s error %s %s',
249 src.get_path_string(), gerror, debug)
250 self.setMood(moods.sad)
251
252
253 try:
254 m = self.make_message_for_gstreamer_error(gerror, debug)
255 except Exception, e:
256 msg = log.getExceptionMessage(e)
257 m = messages.Error(T_(N_(
258 "Programming error in component.")),
259 debug="Bug in %r.make_message_for_gstreamer_error: %s" % (
260 self.__class__, msg))
261
262 self.state.append('messages', m)
263 self._change_monitor.have_error(self.pipeline.get_state(),
264 message)
265
266 def eos():
267 name = src.get_name()
268 if name in self._pad_monitors:
269 self.info('End of stream in element %s', name)
270 self._pad_monitors[name].setInactive()
271 else:
272 self.info("We got an eos from %s", name)
273
274 def default():
275 self.log('message received: %r', message)
276
277 handlers = {gst.MESSAGE_STATE_CHANGED: state_changed,
278 gst.MESSAGE_ERROR: error,
279 gst.MESSAGE_EOS: eos}
280 t = message.type
281 src = message.src
282 handlers.get(t, default)()
283 return True
284
286 """Watch a set of elements for discontinuity messages.
287
288 @param eaterWatchElements: the set of elements to watch for
289 discontinuities.
290 @type eaterWatchElements: Dict of elementName => Eater.
291 """
292
293 def on_element_message(bus, message):
294 src = message.src
295 name = src.get_name()
296 if name in eaterWatchElements:
297 eater = eaterWatchElements[name]
298 s = message.structure
299
300 def timestampDiscont():
301 prevTs = s["prev-timestamp"]
302 prevDuration = s["prev-duration"]
303 curTs = s["cur-timestamp"]
304
305 if prevTs == gst.CLOCK_TIME_NONE:
306 self.debug("no previous timestamp")
307 return
308 if prevDuration == gst.CLOCK_TIME_NONE:
309 self.debug("no previous duration")
310 return
311 if curTs == gst.CLOCK_TIME_NONE:
312 self.debug("no current timestamp")
313 return
314
315 discont = curTs - (prevTs + prevDuration)
316 dSeconds = discont / float(gst.SECOND)
317 self.debug("we have a discont on eater %s of %.9f s "
318 "between %s and %s ", eater.eaterAlias,
319 dSeconds,
320 gst.TIME_ARGS(prevTs + prevDuration),
321 gst.TIME_ARGS(curTs))
322
323 eater.timestampDiscont(dSeconds,
324 float(curTs) / float(gst.SECOND))
325
326 def offsetDiscont():
327 prevOffsetEnd = s["prev-offset-end"]
328 curOffset = s["cur-offset"]
329 discont = curOffset - prevOffsetEnd
330 self.debug("we have a discont on eater %s of %d "
331 "units between %d and %d ",
332 eater.eaterAlias, discont, prevOffsetEnd,
333 curOffset)
334 eater.offsetDiscont(discont, curOffset)
335
336 handlers = {'imperfect-timestamp': timestampDiscont,
337 'imperfect-offset': offsetDiscont}
338 if s.get_name() in handlers:
339 handlers[s.get_name()]()
340
341
342 bus = self.pipeline.get_bus()
343
344 bus.connect("message::element", on_element_message)
345
347
348 def fdsrc_event(pad, event):
349
350
351 if event.type == gst.EVENT_EOS:
352 self.info('End of stream for eater %s, disconnect will be '
353 'triggered', eater.eaterAlias)
354
355
356
357 return False
358 return True
359
360 def depay_event(pad, event):
361
362
363
364 if event.type == gst.EVENT_NEWSEGMENT:
365
366
367
368
369
370
371
372 if getattr(eater, '_gotFirstNewSegment', False):
373 self.info("Subsequent new segment event received on "
374 "depay on eater %s", eater.eaterAlias)
375
376 eater.streamheader = []
377 if self.swallowNewSegment:
378 return False
379 else:
380 eater._gotFirstNewSegment = True
381 return True
382
383 self.debug('adding event probe for eater %s', eater.eaterAlias)
384 fdsrc = self.get_element(eater.elementName)
385 fdsrc.get_pad("src").add_event_probe(fdsrc_event)
386 depay = self.get_element(eater.depayName)
387 depay.get_pad("src").add_event_probe(depay_event)
388
390 self.debug('setup_pipeline()')
391 assert self.bus_signal_id == None
392
393 self.pipeline.set_name('pipeline-' + self.getName())
394 bus = self.pipeline.get_bus()
395 bus.add_signal_watch()
396 self.bus_signal_id = bus.connect('message',
397 self.bus_message_received_cb)
398 sig_id = self.pipeline.connect('deep-notify',
399 gstreamer.verbose_deep_notify_cb, self)
400 self.pipeline_signals.append(sig_id)
401
402
403
404 self.pipeline.set_state(gst.STATE_READY)
405
406
407 if self._get_stats_supported:
408 self._feeder_probe_cl = reactor.callLater(
409 self.FEEDER_STATS_UPDATE_FREQUENCY,
410 self._feeder_probe_calllater)
411 else:
412 self.warning("Feeder statistics unavailable, your "
413 "gst-plugins-base is too old")
414 m = messages.Warning(T_(N_(
415 "Your gst-plugins-base is too old, so "
416 "feeder statistics will be unavailable.")),
417 mid='multifdsink')
418 m.add(T_(N_(
419 "Please upgrade '%s' to version %s."), 'gst-plugins-base',
420 '0.10.11'))
421 self.addMessage(m)
422
423 for eater in self.eaters.values():
424 self.install_eater_event_probes(eater)
425 pad = self.get_element(eater.elementName).get_pad('src')
426 self._pad_monitors.attach(pad, eater.elementName,
427 padmonitor.EaterPadMonitor,
428 self.reconnectEater,
429 eater.eaterAlias)
430 eater.setPadMonitor(self._pad_monitors[eater.elementName])
431
433 if not self.pipeline:
434 return
435
436 if self.clock_provider:
437 self.clock_provider.set_property('active', False)
438 self.clock_provider = None
439 retval = self.pipeline.set_state(gst.STATE_NULL)
440 if retval != gst.STATE_CHANGE_SUCCESS:
441 self.warning('Setting pipeline to NULL failed')
442
444 self.debug("cleaning up")
445
446 assert self.pipeline != None
447
448 self.stop_pipeline()
449
450 map(self.pipeline.disconnect, self.pipeline_signals)
451 self.pipeline_signals = []
452 if self.bus_signal_id:
453 self.pipeline.get_bus().disconnect(self.bus_signal_id)
454 self.pipeline.get_bus().remove_signal_watch()
455 self.bus_signal_id = None
456 self.pipeline = None
457
458 if self._feeder_probe_cl:
459 self._feeder_probe_cl.cancel()
460 self._feeder_probe_cl = None
461
462
463 for eater in self.eaters.values():
464 self._pad_monitors.remove(eater.elementName)
465 eater.setPadMonitor(None)
466
473
475 self.debug("Master clock set to %s:%d with base_time %s", ip, port,
476 gst.TIME_ARGS(base_time))
477
478 assert self._clock_slaved
479 if self._master_clock_info == (ip, port, base_time):
480 self.debug("Same master clock info, returning directly")
481 return defer.succeed(None)
482 elif self._master_clock_info:
483 self.stop_pipeline()
484
485 self._master_clock_info = ip, port, base_time
486
487 clock = gst.NetClientClock(None, ip, port, base_time)
488
489
490 self.pipeline.set_new_stream_time(gst.CLOCK_TIME_NONE)
491 self.pipeline.set_base_time(base_time)
492 self.pipeline.use_clock(clock)
493
494 self.try_start_pipeline()
495
497 """
498 Return the connection details for the network clock provided by
499 this component, if any.
500 """
501 if self.clock_provider:
502 ip, port, base_time = self._master_clock_info
503 return ip, port, base_time
504 else:
505 return None
506
508 """
509 Tell the component to provide a master clock on the given port.
510
511 @returns: a deferred firing a (ip, port, base_time) triple.
512 """
513
514 def pipelinePaused(r):
515 clock = self.pipeline.get_clock()
516
517 self.pipeline.use_clock(clock)
518
519 self.clock_provider = gst.NetTimeProvider(clock, None, port)
520 realport = self.clock_provider.get_property('port')
521
522 base_time = self.pipeline.get_base_time()
523
524 self.debug('provided master clock from %r, base time %s',
525 clock, gst.TIME_ARGS(base_time))
526
527 if self.medium:
528
529
530
531 ip = self.medium.getIP()
532 else:
533 ip = "127.0.0.1"
534
535 self._master_clock_info = (ip, realport, base_time)
536 return self.get_master_clock()
537
538 assert self.pipeline
539 assert not self._clock_slaved
540 (ret, state, pending) = self.pipeline.get_state(0)
541 if state != gst.STATE_PAUSED and state != gst.STATE_PLAYING:
542 self.debug("pipeline still spinning up: %r", state)
543 d = self._change_monitor.add(gst.STATE_CHANGE_READY_TO_PAUSED)
544 d.addCallback(pipelinePaused)
545 return d
546 elif self.clock_provider:
547 self.debug("returning existing master clock info")
548 return defer.succeed(self.get_master_clock())
549 else:
550 return defer.maybeDeferred(pipelinePaused, None)
551
553 """
554 Dumps a graphviz dot file of the pipeline's current state to disk.
555 This will only actually do anything if the environment variable
556 GST_DEBUG_DUMP_DOT_DIR is set.
557
558 @param filename: filename to store
559 @param with_timestamp: if True, then timestamp will be prepended to
560 filename
561 """
562 if hasattr(gst, "DEBUG_BIN_TO_DOT_FILE"):
563 method = gst.DEBUG_BIN_TO_DOT_FILE
564 if with_timestamp:
565 method = gst.DEBUG_BIN_TO_DOT_FILE_WITH_TS
566 method(self.pipeline, gst.DEBUG_GRAPH_SHOW_ALL, filename)
567
568
569
571 """
572 Tell the component to start.
573 Whatever is using the component is responsible for making sure all
574 eaters have received their file descriptor to eat from.
575 """
576 (ret, state, pending) = self.pipeline.get_state(0)
577 if state == gst.STATE_PLAYING:
578 self.log('already PLAYING')
579 if not force:
580 return
581 self.debug('pipeline PLAYING, but starting anyway as requested')
582
583 if self._clock_slaved and not self._master_clock_info:
584 self.debug("Missing master clock info, deferring set to PLAYING")
585 return
586
587 for eater in self.eaters.values():
588 if not eater.fd:
589 self.debug('eater %s not yet connected, deferring set to '
590 'PLAYING', eater.eaterAlias)
591 return
592
593 self.debug("Setting pipeline %r to GST_STATE_PLAYING", self.pipeline)
594 self.pipeline.set_state(gst.STATE_PLAYING)
595
618
620 """
621 After this function returns, the stream lock for this eater must have
622 been released. If your component needs to do something here, override
623 this method.
624 """
625 pass
626
628 """Get an element out of the pipeline.
629
630 If it is possible that the component has not yet been set up,
631 the caller needs to check if self.pipeline is actually set.
632 """
633 assert self.pipeline
634 self.log('Looking up element %r in pipeline %r',
635 element_name, self.pipeline)
636 element = self.pipeline.get_by_name(element_name)
637 if not element:
638 self.warning("No element named %r in pipeline", element_name)
639 return element
640
642 'Gets a property of an element in the GStreamer pipeline.'
643 self.debug("%s: getting property %s of element %s" % (
644 self.getName(), property, element_name))
645 element = self.get_element(element_name)
646 if not element:
647 msg = "Element '%s' does not exist" % element_name
648 self.warning(msg)
649 raise errors.PropertyError(msg)
650
651 self.debug('getting property %s on element %s' % (
652 property, element_name))
653 try:
654 value = element.get_property(property)
655 except (ValueError, TypeError):
656 msg = "Property '%s' on element '%s' does not exist" % (
657 property, element_name)
658 self.warning(msg)
659 raise errors.PropertyError(msg)
660
661
662 if isinstance(value, gobject.GEnum):
663 value = int(value)
664
665 return value
666
668 'Sets a property on an element in the GStreamer pipeline.'
669 self.debug("%s: setting property %s of element %s to %s" % (
670 self.getName(), property, element_name, value))
671 element = self.get_element(element_name)
672 if not element:
673 msg = "Element '%s' does not exist" % element_name
674 self.warning(msg)
675 raise errors.PropertyError(msg)
676
677 self.debug('setting property %s on element %r to %s' %
678 (property, element_name, value))
679 pygobject.gobject_set_property(element, property, value)
680
681
682
684 if not self.medium:
685 self.debug("Can't reconnect eater %s, running "
686 "without a medium", eaterAlias)
687 return
688
689 self.eaters[eaterAlias].disconnected()
690 self.medium.connectEater(eaterAlias)
691
692 - def feedToFD(self, feedName, fd, cleanup, eaterId=None):
693 """
694 @param feedName: name of the feed to feed to the given fd.
695 @type feedName: str
696 @param fd: the file descriptor to feed to
697 @type fd: int
698 @param cleanup: the function to call when the FD is no longer feeding
699 @type cleanup: callable
700 """
701 self.debug('FeedToFD(%s, %d)', feedName, fd)
702
703
704
705 if (not self.pipeline or
706 self.pipeline.get_state(0)[1] == gst.STATE_NULL):
707 self.warning('told to feed %s to fd %d, but pipeline not '
708 'running yet', feedName, fd)
709 cleanup(fd)
710
711
712 return
713
714 if feedName not in self.feeders:
715 msg = "Cannot find feeder named '%s'" % feedName
716 mid = "feedToFD-%s" % feedName
717 m = messages.Warning(T_(N_("Internal Flumotion error.")),
718 debug=msg, mid=mid, priority=40)
719 self.state.append('messages', m)
720 self.warning(msg)
721 cleanup(fd)
722 return False
723
724 feeder = self.feeders[feedName]
725 element = self.get_element(feeder.elementName)
726 assert element
727 clientId = eaterId or ('client-%d' % fd)
728 element.emit('add', fd)
729 feeder.clientConnected(clientId, fd, cleanup)
730
731 - def eatFromFD(self, eaterAlias, feedId, fd):
732 """
733 Tell the component to eat the given feedId from the given fd.
734 The component takes over the ownership of the fd, closing it when
735 no longer eating.
736
737 @param eaterAlias: the alias of the eater
738 @type eaterAlias: str
739 @param feedId: feed id (componentName:feedName) to eat from through
740 the given fd
741 @type feedId: str
742 @param fd: the file descriptor to eat from
743 @type fd: int
744 """
745 self.debug('EatFromFD(%s, %s, %d)', eaterAlias, feedId, fd)
746
747 if not self.pipeline:
748 self.warning('told to eat %s from fd %d, but pipeline not '
749 'running yet', feedId, fd)
750
751
752 os.close(fd)
753 return
754
755 if eaterAlias not in self.eaters:
756 self.warning('Unknown eater alias: %s', eaterAlias)
757 os.close(fd)
758 return
759
760 eater = self.eaters[eaterAlias]
761 element = self.get_element(eater.elementName)
762 if not element:
763 self.warning('Eater element %s not found', eater.elementName)
764 os.close(fd)
765 return
766
767
768 (result, current, pending) = element.get_state(0L)
769 pipeline_playing = current not in [gst.STATE_NULL, gst.STATE_READY]
770 if pipeline_playing:
771 self.debug('eater %s in state %r, kidnapping it',
772 eaterAlias, current)
773
774
775
776
777
778
779
780 srcpad = element.get_pad('src')
781
782 def _block_cb(pad, blocked):
783 pass
784 srcpad.set_blocked_async(True, _block_cb)
785
786
787 depay = self.get_element(eater.depayName)
788
789 def remove_in_caps_buffers(pad, buffer, eater):
790 if buffer.flag_is_set(gst.BUFFER_FLAG_IN_CAPS):
791 if self.keepStreamheaderForLater:
792 self.log("We got buffer with IN_CAPS which we are "
793 "keeping for later %r", eater)
794 eater.streamheader.append(buffer)
795 return False
796 self.info("We got streamheader buffer which " \
797 "we are dropping because we do not want this just " \
798 "after a reconnect because it breaks everything ")
799 return False
800
801
802
803 self.log("We got buffer with no in caps flag set on "
804 "eater %r", eater)
805 if eater.streamheaderBufferProbeHandler:
806 self.log("Removing buffer probe on depay src pad on "
807 "eater %r", eater)
808 pad.remove_buffer_probe(
809 eater.streamheaderBufferProbeHandler)
810 eater.streamheaderBufferProbeHandler = None
811 else:
812 self.warning("buffer probe handler is None, bad news on "
813 "eater %r", eater)
814
815 if not self.dropStreamHeaders:
816 self.log("Pushing earlier buffers with IN_CAPS flag")
817 for buff in eater.streamheader:
818 pad.push(buff)
819 self.dropStreamHeaders = True
820
821 eater.streamheader = []
822 return True
823
824 if not eater.streamheaderBufferProbeHandler:
825 self.log("Adding buffer probe on depay src pad on "
826 "eater %r", eater)
827 eater.streamheaderBufferProbeHandler = \
828 depay.get_pad("src").add_buffer_probe(
829 remove_in_caps_buffers, eater)
830
831 self.unblock_eater(eaterAlias)
832
833
834 sinkpad = srcpad.get_peer()
835 srcpad.unlink(sinkpad)
836 parent = element.get_parent()
837 parent.remove(element)
838 self.log("setting to ready")
839 element.set_state(gst.STATE_READY)
840 self.log("setting to ready complete!!!")
841 old = element.get_property('fd')
842 self.log("Closing old fd %d", old)
843 os.close(old)
844 element.set_property('fd', fd)
845 parent.add(element)
846 srcpad.link(sinkpad)
847 element.set_state(gst.STATE_PLAYING)
848
849 srcpad.set_blocked_async(False, _block_cb)
850 else:
851 element.set_property('fd', fd)
852
853
854
855 eater.connected(fd, feedId)
856
857 if not pipeline_playing:
858 self.try_start_pipeline()
859