Package flumotion :: Package component :: Package decoders :: Package generic :: Module generic
[hide private]

Source Code for Module flumotion.component.decoders.generic.generic

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import gst 
 19  import gobject 
 20  import threading 
 21   
 22  from flumotion.component import decodercomponent as dc 
 23  from flumotion.common import messages, gstreamer 
 24  from flumotion.common.i18n import N_, gettexter 
 25   
 26  T_ = gettexter() 
 27   
 28  __version__ = "$Rev: 7162 $" 
 29   
 30  BASIC_AUDIO_CAPS = "audio/x-raw-int;audio/x-raw-float" 
 31  BASIC_VIDEO_CAPS = "video/x-raw-yuv;video/x-raw-rgb" 
 32   
 33  # FIXME: The GstAutoplugSelectResult enum has no bindings in gst-python. 
 34  # Replace this when the enum is exposed in the bindings. 
 35   
 36  GST_AUTOPLUG_SELECT_TRY = 0 
 37  GST_AUTOPLUG_SELECT_SKIP = 2 
 38   
 39   
40 -class FeederInfo(object):
41
42 - def __init__(self, name, caps, linked=False):
43 self.name = name 44 self.caps = caps
45 46
47 -class SyncKeeper(gst.Element):
48 __gstdetails__ = ('SyncKeeper', 'Generic', 49 'Retimestamp the output to be contiguous and maintain ' 50 'the sync', 'Xavier Queralt') 51 _audiosink = gst.PadTemplate("audio-in", 52 gst.PAD_SINK, 53 gst.PAD_ALWAYS, 54 gst.caps_from_string(BASIC_AUDIO_CAPS)) 55 _videosink = gst.PadTemplate("video-in", 56 gst.PAD_SINK, 57 gst.PAD_ALWAYS, 58 gst.caps_from_string(BASIC_VIDEO_CAPS)) 59 _audiosrc = gst.PadTemplate("audio-out", 60 gst.PAD_SRC, 61 gst.PAD_ALWAYS, 62 gst.caps_from_string(BASIC_AUDIO_CAPS)) 63 _videosrc = gst.PadTemplate("video-out", 64 gst.PAD_SRC, 65 gst.PAD_ALWAYS, 66 gst.caps_from_string(BASIC_VIDEO_CAPS)) 67
68 - def __init__(self):
69 gst.Element.__init__(self) 70 71 # create source pads 72 self.audiosrc = gst.Pad(self._audiosrc, "audio-out") 73 self.add_pad(self.audiosrc) 74 self.videosrc = gst.Pad(self._videosrc, "video-out") 75 self.add_pad(self.videosrc) 76 77 # create the sink pads and set the chain and event function 78 self.audiosink = gst.Pad(self._audiosink, "audio-in") 79 self.audiosink.set_chain_function(lambda pad, buffer: 80 self.chainfunc(pad, buffer, self.audiosrc)) 81 self.audiosink.set_event_function(lambda pad, buffer: 82 self.eventfunc(pad, buffer, self.audiosrc)) 83 self.add_pad(self.audiosink) 84 self.videosink = gst.Pad(self._videosink, "video-in") 85 self.videosink.set_chain_function(lambda pad, buffer: 86 self.chainfunc(pad, buffer, self.videosrc)) 87 self.videosink.set_event_function(lambda pad, buffer: 88 self.eventfunc(pad, buffer, self.videosrc)) 89 self.add_pad(self.videosink) 90 91 # all this variables need to be protected with a lock!!! 92 self._lock = threading.Lock() 93 self._totalTime = 0L 94 self._syncTimestamp = 0L 95 self._syncOffset = 0L 96 self._resetReceived = True 97 self._sendNewSegment = True
98
99 - def _send_new_segment(self):
100 for pad in [self.videosrc, self.audiosrc]: 101 pad.push_event( 102 gst.event_new_new_segment(True, 1.0, gst.FORMAT_TIME, 103 self._syncTimestamp, -1, 0)) 104 self._sendNewSegment = False
105
106 - def _update_sync_point(self, start, position):
107 # Only update the sync point if we haven't received any buffer 108 # (totalTime == 0) or we received a reset 109 if not self._totalTime and not self._resetReceived: 110 return 111 self._syncTimestamp = self._totalTime 112 if position >= start: 113 self._syncOffset = start + (position - start) 114 else: 115 self._syncOffset = start 116 self._resetReceived = False 117 self.info("Update sync point to % r, offset to %r" % 118 (gst.TIME_ARGS(self._syncTimestamp), 119 (gst.TIME_ARGS(self._syncOffset))))
120
121 - def chainfunc(self, pad, buf, srcpad):
122 self.log("Input %s timestamp: %s, %s" % 123 (srcpad is self.audiosrc and 'audio' or 'video', 124 gst.TIME_ARGS(buf.timestamp), 125 gst.TIME_ARGS(buf.duration))) 126 127 if not self._sendNewSegment: 128 self._send_new_segment() 129 130 try: 131 self._lock.acquire() 132 # Discard buffers outside the configured segment 133 if buf.timestamp < self._syncOffset: 134 self.warning("Could not clip buffer to segment") 135 return gst.FLOW_OK 136 if buf.timestamp == gst.CLOCK_TIME_NONE: 137 return gst.FLOW_OK 138 # Get the input stream time of the buffer 139 buf.timestamp -= self._syncOffset 140 # Set the accumulated stream time 141 buf.timestamp += self._syncTimestamp 142 duration = 0 143 if buf.duration != gst.CLOCK_TIME_NONE: 144 duration = buf.duration 145 self._totalTime = max(buf.timestamp + duration, self._totalTime) 146 147 self.log("Output %s timestamp: %s, %s" % 148 (srcpad is self.audiosrc and 'audio' or 'video', 149 gst.TIME_ARGS(buf.timestamp), 150 gst.TIME_ARGS(buf.duration))) 151 finally: 152 self._lock.release() 153 154 srcpad.push(buf) 155 return gst.FLOW_OK
156
157 - def eventfunc(self, pad, event, srcpad):
158 self.debug("Received event %r from %s" % (event, event.src)) 159 try: 160 self._lock.acquire() 161 if event.type == gst.EVENT_NEWSEGMENT: 162 u, r, f, start, s, position = event.parse_new_segment() 163 self._update_sync_point(start, position) 164 if gstreamer.event_is_flumotion_reset(event): 165 self._resetReceived = True 166 self._send_new_segment = True 167 finally: 168 self._lock.release() 169 170 # forward all the events except the new segment events 171 if event.type != gst.EVENT_NEWSEGMENT: 172 return srcpad.push_event(event) 173 return True
174 175 gobject.type_register(SyncKeeper) 176 gst.element_register(SyncKeeper, "synckeeper", gst.RANK_MARGINAL) 177 178
179 -class GenericDecoder(dc.DecoderComponent):
180 """ 181 Generic decoder component using decodebin2. 182 183 It listen to the custom gstreamer event flumotion-reset, 184 and reset the decoding pipeline by removing the old one 185 and creating a new one. 186 187 Sub-classes must override _get_feeders_info() and return 188 a list of FeederInfo instances that describe the decoder 189 output. 190 191 When reset, if the new decoded pads do not match the 192 previously negotiated caps, feeder will not be connected, 193 and the decoder will go sad. 194 """ 195 196 logCategory = "gen-decoder" 197 feeder_tmpl = ("identity name=%(ename)s single-segment=true " 198 "silent=true ! %(caps)s ! @feeder:%(pad)s@") 199 200 ### Public Methods ### 201
202 - def init(self):
203 self._feeders_info = None # {FEEDER_NAME: FeederInfo}
204
205 - def get_pipeline_string(self, properties):
206 # Retrieve feeder info and build a dict out of it 207 finfo = self._get_feeders_info() 208 assert finfo, "No feeder info specified" 209 self._feeders_info = dict([(i.name, i) for i in finfo]) 210 211 pipeline_parts = [self._get_base_pipeline_string()] 212 213 for i in self._feeders_info.values(): 214 ename = self._get_output_element_name(i.name) 215 pipeline_parts.append( 216 self.feeder_tmpl % dict(ename=ename, caps=i.caps, pad=i.name)) 217 218 pipeline_str = " ".join(pipeline_parts) 219 self.log("Decoder pipeline: %s", pipeline_str) 220 221 self._blacklist = properties.get('blacklist', []) 222 223 return pipeline_str
224
225 - def configure_pipeline(self, pipeline, properties):
226 dc.DecoderComponent.configure_pipeline(self, pipeline, 227 properties) 228 229 decoder = self.pipeline.get_by_name("decoder") 230 decoder.connect('autoplug-select', self._autoplug_select_cb)
231 232 ### Protected Methods ## 233
235 return 'decodebin2 name=decoder'
236
237 - def _get_feeders_info(self):
238 """ 239 Must be overridden to returns a tuple of FeederInfo. 240 """ 241 return None
242 243 ### Private Methods ### 244
245 - def _get_output_element_name(self, feed_name):
246 return "%s-output" % feed_name
247 248 ### Callbacks ### 249
250 - def _autoplug_select_cb(self, decoder, pad, caps, factory):
251 if factory.get_name() in self._blacklist: 252 self.log("Skipping element %s because it's in the blacklist", 253 factory.get_name()) 254 return GST_AUTOPLUG_SELECT_SKIP 255 return GST_AUTOPLUG_SELECT_TRY
256 257
258 -class SingleGenericDecoder(GenericDecoder):
259 260 logCategory = "sgen-decoder" 261 262 _caps_lookup = {'audio': BASIC_AUDIO_CAPS, 263 'video': BASIC_VIDEO_CAPS} 264
265 - def init(self):
266 self._media_type = None
267
268 - def check_properties(self, properties, addMessage):
269 media_type = properties.get("media-type") 270 if media_type not in ["audio", "video"]: 271 msg = 'Property media-type can only be "audio" or "video"' 272 m = messages.Error(T_(N_(msg)), mid="error-decoder-media-type") 273 addMessage(m) 274 else: 275 self._media_type = media_type
276
277 - def _get_feeders_info(self):
278 caps = self._caps_lookup[self._media_type] 279 return FeederInfo('default', caps),
280 281
282 -class AVGenericDecoder(GenericDecoder):
283 284 logCategory = "avgen-decoder" 285 feeder_tmpl = ("identity name=%(ename)s silent=true ! %(caps)s ! " 286 "sync.%(pad)s-in sync.%(pad)s-out ! @feeder:%(pad)s@") 287
288 - def _get_feeders_info(self):
289 return (FeederInfo('audio', BASIC_AUDIO_CAPS), 290 FeederInfo('video', BASIC_VIDEO_CAPS))
291
293 return 'decodebin2 name=decoder synckeeper name=sync'
294