Package flumotion :: Package component :: Module feedcomponent
[hide private]

Source Code for Module flumotion.component.feedcomponent

   1  # -*- Mode: Python -*- 
   2  # vi:si:et:sw=4:sts=4:ts=4 
   3  # 
   4  # Flumotion - a streaming media server 
   5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
   6  # All rights reserved. 
   7   
   8  # This file may be distributed and/or modified under the terms of 
   9  # the GNU General Public License version 2 as published by 
  10  # the Free Software Foundation. 
  11  # This file is distributed without any warranty; without even the implied 
  12  # warranty of merchantability or fitness for a particular purpose. 
  13  # See "LICENSE.GPL" in the source distribution for more information. 
  14   
  15  # Licensees having purchased or holding a valid Flumotion Advanced 
  16  # Streaming Server license may use this file in accordance with the 
  17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
  18  # See "LICENSE.Flumotion" in the source distribution for more information. 
  19   
  20  # Headers in this file shall remain intact. 
  21   
  22  """ 
  23  Feed components, participating in the stream 
  24  """ 
  25   
  26  import os 
  27   
  28  import gst 
  29  import gst.interfaces 
  30  import gobject 
  31   
  32  from twisted.internet import reactor, defer 
  33  from twisted.spread import pb 
  34  from zope.interface import implements 
  35   
  36  from flumotion.configure import configure 
  37  from flumotion.component import component as basecomponent 
  38  from flumotion.component import feed 
  39  from flumotion.common import common, interfaces, errors, log, pygobject, \ 
  40       messages 
  41  from flumotion.common import gstreamer 
  42  from flumotion.common.i18n import N_, gettexter 
  43  from flumotion.common.planet import moods 
  44  from flumotion.common.pygobject import gsignal 
  45   
  46  __version__ = "$Rev$" 
  47  T_ = gettexter() 
  48   
  49   
50 -class FeedComponentMedium(basecomponent.BaseComponentMedium):
51 """ 52 I am a component-side medium for a FeedComponent to interface with 53 the manager-side ComponentAvatar. 54 """ 55 implements(interfaces.IComponentMedium) 56 logCategory = 'feedcompmed' 57 remoteLogName = 'feedserver' 58
59 - def __init__(self, component):
60 """ 61 @param component: L{flumotion.component.feedcomponent.FeedComponent} 62 """ 63 basecomponent.BaseComponentMedium.__init__(self, component) 64 65 self._feederFeedServer = {} # eaterAlias -> (fullFeedId, host, port) 66 # tuple for remote feeders 67 self._feederPendingConnections = {} # eaterAlias -> cancel thunk 68 self._eaterFeedServer = {} # fullFeedId -> (host, port) tuple 69 # for remote eaters 70 self._eaterPendingConnections = {} # feederName -> cancel thunk 71 self.logName = component.name
72 73 ### Referenceable remote methods which can be called from manager 74
75 - def remote_attachPadMonitorToFeeder(self, feederName):
76 self.comp.attachPadMonitorToFeeder(feederName)
77
78 - def remote_setGstDebug(self, debug):
79 """ 80 Sets the GStreamer debugging levels based on the passed debug string. 81 82 @since: 0.4.2 83 """ 84 self.debug('Setting GStreamer debug level to %s' % debug) 85 if not debug: 86 return 87 88 for part in debug.split(','): 89 glob = None 90 value = None 91 pair = part.split(':') 92 if len(pair) == 1: 93 # assume only the value 94 value = int(pair[0]) 95 elif len(pair) == 2: 96 glob, value = pair 97 value = int(value) 98 else: 99 self.warning("Cannot parse GStreamer debug setting '%s'." % 100 part) 101 continue 102 103 if glob: 104 try: 105 # value has to be an integer 106 gst.debug_set_threshold_for_name(glob, value) 107 except TypeError: 108 self.warning("Cannot set glob %s to value %s" % ( 109 glob, value)) 110 else: 111 gst.debug_set_default_threshold(value) 112 113 self.comp.uiState.set('gst-debug', debug)
114
115 - def remote_eatFrom(self, eaterAlias, fullFeedId, host, port):
116 """ 117 Tell the component the host and port for the FeedServer through which 118 it can connect a local eater to a remote feeder to eat the given 119 fullFeedId. 120 121 Called on by the manager-side ComponentAvatar. 122 """ 123 if self._feederFeedServer.get(eaterAlias): 124 if self._feederFeedServer[eaterAlias] == (fullFeedId, host, port): 125 self.debug("Feed:%r is the same as the current one. "\ 126 "Request ignored.", (fullFeedId, host, port)) 127 return 128 self._feederFeedServer[eaterAlias] = (fullFeedId, host, port) 129 return self.connectEater(eaterAlias)
130
131 - def _getAuthenticatorForFeed(self, eaterAliasOrFeedName):
132 # The avatarId on the keycards issued by the authenticator will 133 # identify us to the remote component. Attempt to use our 134 # fullFeedId, for debugging porpoises. 135 if hasattr(self.authenticator, 'copy'): 136 tup = common.parseComponentId(self.authenticator.avatarId) 137 flowName, componentName = tup 138 fullFeedId = common.fullFeedId(flowName, componentName, 139 eaterAliasOrFeedName) 140 return self.authenticator.copy(fullFeedId) 141 else: 142 return self.authenticator
143
144 - def connectEater(self, eaterAlias):
145 """ 146 Connect one of the medium's component's eaters to a remote feed. 147 Called by the component, both on initial connection and for 148 reconnecting. 149 150 @returns: deferred that will fire with a value of None 151 """ 152 # FIXME: There's no indication if the connection was made or not 153 154 def gotFeed((feedId, fd)): 155 self._feederPendingConnections.pop(eaterAlias, None) 156 self.comp.eatFromFD(eaterAlias, feedId, fd)
157 158 if eaterAlias not in self._feederFeedServer: 159 self.debug("eatFrom() hasn't been called yet for eater %s", 160 eaterAlias) 161 # unclear if this function should have a return value at 162 # all... 163 return defer.succeed(None) 164 165 (fullFeedId, host, port) = self._feederFeedServer[eaterAlias] 166 167 cancel = self._feederPendingConnections.pop(eaterAlias, None) 168 if cancel: 169 self.debug('cancelling previous connection attempt on %s', 170 eaterAlias) 171 cancel() 172 173 client = feed.FeedMedium(logName=self.comp.name) 174 175 d = client.requestFeed(host, port, 176 self._getAuthenticatorForFeed(eaterAlias), 177 fullFeedId) 178 self._feederPendingConnections[eaterAlias] = client.stopConnecting 179 d.addCallback(gotFeed) 180 return d
181
182 - def remote_feedTo(self, feederName, fullFeedId, host, port):
183 """ 184 Tell the component to feed the given feed to the receiving component 185 accessible through the FeedServer on the given host and port. 186 187 Called on by the manager-side ComponentAvatar. 188 """ 189 self._eaterFeedServer[fullFeedId] = (host, port) 190 self.connectFeeder(feederName, fullFeedId)
191
192 - def connectFeeder(self, feederName, fullFeedId):
193 """ 194 Tell the component to feed the given feed to the receiving component 195 accessible through the FeedServer on the given host and port. 196 197 Called on by the manager-side ComponentAvatar. 198 """ 199 200 def gotFeed((fullFeedId, fd)): 201 self._eaterPendingConnections.pop(feederName, None) 202 self.comp.feedToFD(feederName, fd, os.close, fullFeedId)
203 204 if fullFeedId not in self._eaterFeedServer: 205 self.debug("feedTo() hasn't been called yet for feeder %s", 206 feederName) 207 # unclear if this function should have a return value at 208 # all... 209 return defer.succeed(None) 210 211 host, port = self._eaterFeedServer[fullFeedId] 212 213 # probably should key on feederName as well 214 cancel = self._eaterPendingConnections.pop(fullFeedId, None) 215 if cancel: 216 self.debug('cancelling previous connection attempt on %s', 217 feederName) 218 cancel() 219 220 client = feed.FeedMedium(logName=self.comp.name) 221 222 d = client.sendFeed(host, port, 223 self._getAuthenticatorForFeed(feederName), 224 fullFeedId) 225 self._eaterPendingConnections[feederName] = client.stopConnecting 226 d.addCallback(gotFeed) 227 return d 228
229 - def remote_provideMasterClock(self, port):
230 """ 231 Tells the component to start providing a master clock on the given 232 UDP port. 233 Can only be called if setup() has been called on the component. 234 235 The IP address returned is the local IP the clock is listening on. 236 237 @returns: (ip, port, base_time) 238 @rtype: tuple of (str, int, long) 239 """ 240 self.debug('remote_provideMasterClock(port=%r)' % port) 241 return self.comp.provide_master_clock(port)
242
243 - def remote_getMasterClockInfo(self):
244 """ 245 Return the clock master info created by a previous call 246 to provideMasterClock. 247 248 @returns: (ip, port, base_time) 249 @rtype: tuple of (str, int, long) 250 """ 251 return self.comp.get_master_clock()
252
253 - def remote_setMasterClock(self, ip, port, base_time):
254 return self.comp.set_master_clock(ip, port, base_time)
255
256 - def remote_effect(self, effectName, methodName, *args, **kwargs):
257 """ 258 Invoke the given methodName on the given effectName in this component. 259 The effect should implement effect_(methodName) to receive the call. 260 """ 261 self.debug("calling %s on effect %s" % (methodName, effectName)) 262 if not effectName in self.comp.effects: 263 raise errors.UnknownEffectError(effectName) 264 effect = self.comp.effects[effectName] 265 if not hasattr(effect, "effect_%s" % methodName): 266 raise errors.NoMethodError("%s on effect %s" % (methodName, 267 effectName)) 268 method = getattr(effect, "effect_%s" % methodName) 269 try: 270 result = method(*args, **kwargs) 271 except TypeError: 272 msg = "effect method %s did not accept %s and %s" % ( 273 methodName, args, kwargs) 274 self.debug(msg) 275 raise errors.RemoteRunError(msg) 276 self.debug("effect: result: %r" % result) 277 return result
278
279 - def remote_dumpGstreamerDotFile(self, filename):
280 self.comp.dump_gstreamer_debug_dot_file(filename)
281 282 from feedcomponent010 import FeedComponent 283 284 FeedComponent.componentMediumClass = FeedComponentMedium 285 286
287 -class ParseLaunchComponent(FeedComponent):
288 """A component using gst-launch syntax 289 290 @cvar checkTimestamp: whether to check continuity of timestamps for eaters 291 @cvar checkOffset: whether to check continuity of offsets for 292 eaters 293 """ 294 295 DELIMITER = '@' 296 297 # can be set by subclasses 298 checkTimestamp = False 299 checkOffset = False 300 301 # keep these as class variables for the tests 302 FDSRC_TMPL = 'fdsrc name=%(name)s' 303 DEPAY_TMPL = 'gdpdepay name=%(name)s-depay' 304 FEEDER_TMPL = 'gdppay name=%(name)s-pay ! multifdsink sync=false '\ 305 'name=%(name)s buffers-max=500 buffers-soft-max=450 '\ 306 'recover-policy=1' 307 EATER_TMPL = None 308
309 - def init(self):
310 if not gstreamer.get_plugin_version('coreelements'): 311 raise errors.MissingElementError('identity') 312 if not gstreamer.element_factory_has_property('identity', 313 'check-imperfect-timestamp'): 314 self.checkTimestamp = False 315 self.checkOffset = False 316 self.addMessage( 317 messages.Info(T_(N_( 318 "You will get more debugging information " 319 "if you upgrade to GStreamer 0.10.13 or later.")))) 320 321 self.EATER_TMPL = self.FDSRC_TMPL + ' %(queue)s ' + self.DEPAY_TMPL 322 if self.checkTimestamp or self.checkOffset: 323 self.EATER_TMPL += " ! identity name=%(name)s-identity silent=TRUE" 324 if self.checkTimestamp: 325 self.EATER_TMPL += " check-imperfect-timestamp=1" 326 if self.checkOffset: 327 self.EATER_TMPL += " check-imperfect-offset=1"
328 329 ### FeedComponent interface implementations 330
331 - def create_pipeline(self):
332 try: 333 unparsed = self.get_pipeline_string(self.config['properties']) 334 except errors.MissingElementError, e: 335 self.warning('Missing %s element' % e.args[0]) 336 m = messages.Error(T_(N_( 337 "The worker does not have the '%s' element installed.\n" 338 "Please install the necessary plug-in and restart " 339 "the component.\n"), e.args[0])) 340 self.addMessage(m) 341 raise errors.ComponentSetupHandledError(e) 342 343 self.pipeline_string = self.parse_pipeline(unparsed) 344 345 try: 346 pipeline = gst.parse_launch(self.pipeline_string) 347 except gobject.GError, e: 348 self.warning('Could not parse pipeline: %s' % e.message) 349 m = messages.Error(T_(N_( 350 "GStreamer error: could not parse component pipeline.")), 351 debug=e.message) 352 self.addMessage(m) 353 raise errors.PipelineParseError(e.message) 354 355 return pipeline
356
357 - def set_pipeline(self, pipeline):
358 FeedComponent.set_pipeline(self, pipeline) 359 if self.checkTimestamp or self.checkOffset: 360 watchElements = dict([ 361 (e.elementName + '-identity', e) 362 for e in self.eaters.values()]) 363 self.install_eater_continuity_watch(watchElements) 364 self.configure_pipeline(self.pipeline, self.config['properties'])
365 366 ### ParseLaunchComponent interface for subclasses 367
368 - def get_pipeline_string(self, properties):
369 """ 370 Method that must be implemented by subclasses to produce the 371 gstparse string for the component's pipeline. Subclasses should 372 not chain up; this method raises a NotImplemented error. 373 374 Returns: a new pipeline string representation. 375 """ 376 raise NotImplementedError('subclasses should implement ' 377 'get_pipeline_string')
378
379 - def configure_pipeline(self, pipeline, properties):
380 """ 381 Method that can be implemented by subclasses if they wish to 382 interact with the pipeline after it has been created and set 383 on the component. 384 385 This could include attaching signals and bus handlers. 386 """ 387 pass
388 389 ### private methods 390
391 - def add_default_eater_feeder(self, pipeline):
392 if len(self.eaters) == 1: 393 eater = 'eater:' + self.eaters.keys()[0] 394 if eater not in pipeline: 395 pipeline = '@' + eater + '@ ! ' + pipeline 396 if len(self.feeders) == 1: 397 feeder = 'feeder:' + self.feeders.keys()[0] 398 if feeder not in pipeline: 399 pipeline = pipeline + ' ! @' + feeder + '@' 400 return pipeline
401
402 - def parse_tmpl(self, pipeline, templatizers):
403 """ 404 Expand the given pipeline string representation by substituting 405 blocks between '@' with a filled-in template. 406 407 @param pipeline: a pipeline string representation with variables 408 @param templatizers: A dict of prefix => procedure. Template 409 blocks in the pipeline will be replaced 410 with the result of calling the procedure 411 with what is left of the template after 412 taking off the prefix. 413 @returns: a new pipeline string representation. 414 """ 415 assert pipeline != '' 416 417 # verify the template has an even number of delimiters 418 if pipeline.count(self.DELIMITER) % 2 != 0: 419 raise TypeError("'%s' contains an odd number of '%s'" 420 % (pipeline, self.DELIMITER)) 421 422 out = [] 423 for i, block in enumerate(pipeline.split(self.DELIMITER)): 424 # when splitting, the even-indexed members will remain, and 425 # the odd-indexed members are the blocks to be substituted 426 if i % 2 == 0: 427 out.append(block) 428 else: 429 block = block.strip() 430 try: 431 pos = block.index(':') 432 except ValueError: 433 raise TypeError("Template %r has no colon" % (block, )) 434 prefix = block[:pos+1] 435 if prefix not in templatizers: 436 raise TypeError("Template %r has invalid prefix %r" 437 % (block, prefix)) 438 out.append(templatizers[prefix](block[pos+1:])) 439 return ''.join(out)
440
441 - def parse_pipeline(self, pipeline):
442 pipeline = " ".join(pipeline.split()) 443 self.debug('Creating pipeline, template is %s', pipeline) 444 445 if pipeline == '' and not self.eaters: 446 raise TypeError("Need a pipeline or a eater") 447 448 if pipeline == '': 449 # code of dubious value 450 assert self.eaters 451 pipeline = 'fakesink signal-handoffs=1 silent=1 name=sink' 452 453 pipeline = self.add_default_eater_feeder(pipeline) 454 pipeline = self.parse_tmpl(pipeline, 455 {'eater:': self.get_eater_template, 456 'feeder:': self.get_feeder_template}) 457 458 self.debug('pipeline is %s', pipeline) 459 assert self.DELIMITER not in pipeline 460 461 return pipeline
462
463 - def get_eater_template(self, eaterAlias):
464 queue = self.get_queue_string(eaterAlias) 465 elementName = self.eaters[eaterAlias].elementName 466 467 return self.EATER_TMPL % {'name': elementName, 'queue': queue}
468
469 - def get_feeder_template(self, feederName):
470 elementName = self.feeders[feederName].elementName 471 return self.FEEDER_TMPL % {'name': elementName}
472
473 - def get_queue_string(self, eaterAlias):
474 """ 475 Return a parse-launch string to join the fdsrc eater element and 476 the depayer, for example '!' or '! queue !'. The string may have 477 no format strings. 478 """ 479 return '!'
480
481 - def get_eater_srcpad(self, eaterAlias):
482 """ 483 Method that returns the source pad of the final element in an eater. 484 485 @returns: the GStreamer source pad of the final element in an eater 486 @rtype: L{gst.Pad} 487 """ 488 e = self.eaters[eaterAlias] 489 identity = self.get_element(e.elementName + '-identity') 490 depay = self.get_element(e.depayName) 491 srcpad = depay.get_pad("src") 492 if identity: 493 srcpad = identity.get_pad("src") 494 return srcpad
495 496
497 -class Effect(log.Loggable):
498 """ 499 I am a part of a feed component for a specific group 500 of functionality. 501 502 @ivar name: name of the effect 503 @type name: string 504 @ivar component: component owning the effect 505 @type component: L{FeedComponent} 506 """ 507 logCategory = "effect" 508
509 - def __init__(self, name):
510 """ 511 @param name: the name of the effect 512 """ 513 self.name = name 514 self.setComponent(None)
515
516 - def setComponent(self, component):
517 """ 518 Set the given component as the effect's owner. 519 520 @param component: the component to set as an owner of this effect 521 @type component: L{FeedComponent} 522 """ 523 self.component = component 524 self.setUIState(component and component.uiState or None)
525
526 - def setUIState(self, state):
527 """ 528 Set the given UI state on the effect. This method is ideal for 529 adding keys to the UI state. 530 531 @param state: the UI state for the component to use. 532 @type state: L{flumotion.common.componentui.WorkerComponentUIState} 533 """ 534 self.uiState = state
535
536 - def getComponent(self):
537 """ 538 Get the component owning this effect. 539 540 @rtype: L{FeedComponent} 541 """ 542 return self.component
543 544
545 -class PostProcEffect (Effect):
546 """ 547 I am an effect that is plugged in the pipeline to do a post processing 548 job and can be chained to other effect of the same class. 549 550 @ivar name: name of the effect 551 @type name: string 552 @ivar component: component owning the effect 553 @type component: L{FeedComponent} 554 @ivar sourcePad: pad of the source after which I'm plugged 555 @type sourcePad: L{GstPad} 556 @ivar effectBin: gstreamer bin doing the post processing effect 557 @type source: L{GstBin} 558 @ivar pipeline: pipeline holding the gstreamer elements 559 @type pipeline: L{GstPipeline} 560 561 """ 562 logCategory = "effect" 563
564 - def __init__(self, name, sourcePad, effectBin, pipeline):
565 """ 566 @param name: the name of the effect 567 @param sourcePad: pad of the source after which I'm plugged 568 @param effectBin: gstreamer bin doing the post processing effect 569 @param pipeline: pipeline holding the gstreamer elements 570 """ 571 Effect.__init__(self, name) 572 self.sourcePad = sourcePad 573 self.effectBin = effectBin 574 self.pipeline = pipeline 575 self.plugged = False
576
577 - def plug(self):
578 """ 579 Plug the effect in the pipeline unlinking the source element with it's 580 downstream peer 581 """ 582 if self.plugged == True: 583 return 584 # Unlink the source pad of the source element after which we need 585 # are going to be plugged 586 peerSinkPad = self.sourcePad 587 peerSrcPad = peerSinkPad.get_peer() 588 peerSinkPad.unlink(peerSrcPad) 589 590 # Add the deinterlacer bin to the pipeline 591 self.effectBin.set_state(gst.STATE_PLAYING) 592 self.pipeline.add(self.effectBin) 593 594 # link it with the element src pad and its peer's sink pad 595 peerSinkPad.link(self.effectBin.get_pad('sink')) 596 self.effectBin.get_pad('src').link(peerSrcPad) 597 self.plugged = True
598 599
600 -class MultiInputParseLaunchComponent(ParseLaunchComponent):
601 """ 602 This class provides for multi-input ParseLaunchComponents, such as muxers, 603 with a queue attached to each input. 604 """ 605 QUEUE_SIZE_BUFFERS = 16 606 LINK_MUXER = True 607
608 - def get_muxer_string(self, properties):
609 """ 610 Return a gst-parse description of the muxer, which 611 must be named 'muxer' 612 """ 613 raise errors.NotImplementedError("Implement in a subclass")
614
615 - def get_queue_string(self, eaterAlias):
616 name = self.eaters[eaterAlias].elementName 617 return ("! queue name=%s-queue max-size-buffers=%d !" 618 % (name, self.QUEUE_SIZE_BUFFERS))
619
620 - def get_pipeline_string(self, properties):
621 eaters = self.config.get('eater', {}) 622 sources = self.config.get('source', []) 623 if eaters == {} and sources != []: 624 # for upgrade without manager restart 625 feeds = [] 626 for feed in sources: 627 if not ':' in feed: 628 feed = '%s:default' % feed 629 feeds.append(feed) 630 eaters = {'default': [(x, 'default') for x in feeds]} 631 632 pipeline = '' 633 for e in eaters: 634 for feed, alias in eaters[e]: 635 pipeline += '@ eater:%s @ ' % alias 636 if self.LINK_MUXER: 637 pipeline += ' ! muxer. ' 638 639 pipeline += self.get_muxer_string(properties) + ' ' 640 641 return pipeline
642
643 - def unblock_eater(self, eaterAlias):
644 # Firstly, ensure that any push in progress is guaranteed to return, 645 # by temporarily enlarging the queue 646 queuename = self.eaters[eaterAlias].elementName + '-queue' 647 queue = self.pipeline.get_by_name(queuename) 648 649 size = queue.get_property("max-size-buffers") 650 queue.set_property("max-size-buffers", size + 1) 651 652 # So, now it's guaranteed to return. However, we want to return the 653 # queue size to its original value. Doing this in a thread-safe manner 654 # is rather tricky... 655 656 def _block_cb(pad, blocked): 657 # This is called from streaming threads, but we don't do anything 658 # here so it's safe. 659 pass
660 661 def _underrun_cb(element): 662 # Called from a streaming thread. The queue element does not hold 663 # the queue lock when this is called, so we block our sinkpad, 664 # then re-check the current level. 665 pad = element.get_pad("sink") 666 pad.set_blocked_async(True, _block_cb) 667 level = element.get_property("current-level-buffers") 668 if level < self.QUEUE_SIZE_BUFFERS: 669 element.set_property('max-size-buffers', 670 self.QUEUE_SIZE_BUFFERS) 671 element.disconnect(signalid) 672 pad.set_blocked_async(False, _block_cb)
673 674 signalid = queue.connect("underrun", _underrun_cb) 675 676
677 -class ReconfigurableComponent(ParseLaunchComponent):
678 679 disconnectedPads = False 680
681 - def _get_base_pipeline_string(self):
682 """Should be overrided by subclasses to provide the pipeline the 683 component uses. 684 """ 685 return ""
686
687 - def init(self):
688 self.EATER_TMPL += ' ! queue name=input-%(name)s' 689 self._reset_count = 0 690 691 self.uiState.addKey('reset-count', 0)
692
693 - def setup_completed(self):
696 697 # Public methods 698
699 - def get_output_elements(self):
700 return [self.get_element(f.elementName + '-pay') 701 for f in self.feeders.values()]
702
703 - def get_input_elements(self):
704 return [self.get_element('input-' + f.elementName) 705 for f in self.eaters.values()]
706
707 - def get_base_pipeline_string(self):
708 raise NotImplementedError('Subclasses should implement ' 709 'get_base_pipeline_string')
710
711 - def get_eater_srcpad(self, eaterAlias):
712 e = self.eaters[eaterAlias] 713 inputq = self.get_element('input-' + e.elementName) 714 return inputq.get_pad('src')
715 716 # Private methods 717
718 - def _install_changes_probes(self):
719 """ 720 Add the event probes that will check for the flumotion-reset event. 721 722 Those will trigger the pipeline's blocking and posterior reload 723 """ 724 # FIXME: Add documentation 725 726 def input_reset_event(pad, event): 727 if event.type != gst.EVENT_CUSTOM_DOWNSTREAM: 728 return True 729 if event.get_structure().get_name() != 'flumotion-reset': 730 return True 731 if self.disconnectedPads: 732 return False 733 734 self.log('RESET: in reset event received on input pad %r', pad) 735 self._reset_count = len(self.feeders) 736 # Block all the eaters and send an eos downstream the pipeline to 737 # drain all the elements. It will also unlink the pipeline from the 738 # input queues. 739 self._block_eaters() 740 # Do not propagate the event. It is sent from the other side of 741 # the pipeline after it has been drained. 742 return False
743 744 def output_reset_event(pad, event): 745 if event.type != gst.EVENT_EOS: 746 return True 747 748 self.log('RESET: out reset event received on output pad %r', pad) 749 # TODO: Can we use EVENT_FLUSH_{START,STOP} for the same purpose? 750 # The component only waits for the first eos to come. After that 751 # all the elements inside the pipeline will be down and won't 752 # process any more events. 753 # Pads cannot be blocked from the streaming thread. They have to be 754 # manipulated from outside according gstreamer's documentation 755 self._reset_count -= 1 756 if self._reset_count > 0: 757 return False 758 759 self._send_reset_event() 760 reactor.callFromThread(self._on_pipeline_drained) 761 # Do not let the eos pass. 762 return False
763 764 self.log('RESET: installing event probes for detecting changes') 765 # Listen for incoming flumotion-reset events on eaters 766 for elem in self.get_input_elements(): 767 self.debug('RESET: adding event probe for %s', elem.get_name()) 768 elem.get_pad('sink').add_event_probe(input_reset_event) 769 770 for elem in self.get_output_elements(): 771 self.debug('RESET: adding event probe for %s', elem.get_name()) 772 elem.get_pad('sink').add_event_probe(output_reset_event) 773
774 - def _block_eaters(self):
775 """ 776 Function that blocks all the identities of the eaters 777 """ 778 for elem in self.get_input_elements(): 779 pad = elem.get_pad('src') 780 self.debug("RESET: Blocking pad %s", pad) 781 pad.set_blocked_async(True, self._on_eater_blocked)
782
783 - def _unblock_eaters(self):
784 for elem in self.get_input_elements(): 785 pad = elem.get_pad('src') 786 self.debug("RESET: Unblocking pad %s", pad) 787 pad.set_blocked_async(False, self._on_eater_blocked)
788
789 - def _send_reset_event(self):
790 event = gst.event_new_custom(gst.EVENT_CUSTOM_DOWNSTREAM, 791 gst.Structure('flumotion-reset')) 792 793 for elem in self.get_output_elements(): 794 pad = elem.get_pad('sink') 795 pad.send_event(event)
796 810
811 - def _remove_pipeline(self, pipeline, element, end, done=None):
812 if done is None: 813 done = [] 814 if not element: 815 return 816 if element in done: 817 return 818 if element in end: 819 return 820 821 for src in element.src_pads(): 822 self.log('going to start by pad %r', src) 823 if not src.get_peer(): 824 continue 825 peer = src.get_peer().get_parent() 826 self._remove_pipeline(pipeline, peer, end, done) 827 done.append(peer) 828 element.unlink(peer) 829 830 self.log("RESET: removing old element %s from pipeline", element) 831 element.set_state(gst.STATE_NULL) 832 pipeline.remove(element)
833
834 - def _rebuild_pipeline(self):
835 # TODO: Probably this would be easier and clearer if we used a bin to 836 # wrap the component's functionality.Then the component would only need 837 # to reset the bin and connect the resulting pads to the {eat,feed}ers. 838 839 self.log('RESET: Going to rebuild the pipeline') 840 841 base_pipe = self._get_base_pipeline_string() 842 843 # Place a fakesrc element so we can know from where to start 844 # rebuilding the pipeline. 845 fake_pipeline = 'fakesrc name=start ! %s' % base_pipe 846 pipeline = gst.parse_launch(fake_pipeline) 847 848 def move_element(element, orig, dest): 849 if not element: 850 return 851 if element in done: 852 return 853 854 to_link = [] 855 done.append(element) 856 self.log("RESET: going to remove %s", element) 857 for src in element.src_pads(): 858 self.log("RESET: got src pad element %s", src) 859 if not src.get_peer(): 860 continue 861 peer = src.get_peer().get_parent() 862 to_link.append(peer) 863 864 move_element(to_link[-1], orig, dest) 865 866 self._unlink_pads(element, [gst.PAD_SRC, gst.PAD_SINK]) 867 orig.remove(element) 868 dest.add(element) 869 870 self.log("RESET: new element %s added to the pipeline", element) 871 for peer in to_link: 872 self.log("RESET: linking peers %s -> %s", element, peer) 873 element.link(peer)
874 875 done = [] 876 start = pipeline.get_by_name('start').get_pad('src').get_peer() 877 move_element(start.get_parent(), pipeline, self.pipeline) 878 879 # Link eaters to the first element in the pipeline 880 # By now we there can only be two situations: 881 # 1. Encoders, where there is only one eater connected to the encoder 882 # 2. Muxers, where multiple eaters are connected directly to the muxer 883 # TODO: Probably we'd like the link process to check the caps 884 if len(self.get_input_elements()) == 1: 885 elem = self.get_input_elements()[0] 886 self.log("RESET: linking eater %r to %r", elem, done[0]) 887 elem.link(done[0]) 888 889 # Link the last element in the pipeline to the feeders. 890 if len(self.get_output_elements()) == 1: 891 elem = self.get_output_elements()[0] 892 self.log("RESET: linking %r to feeder %r", done[-1], elem) 893 done[-1].link(elem) 894 895 self.configure_pipeline(self.pipeline, self.config['properties']) 896 self.pipeline.set_state(gst.STATE_PLAYING) 897 self._unblock_eaters() 898 899 resets = self.uiState.get('reset-count') 900 self.uiState.set('reset-count', resets+1) 901 902 # Callbacks 903
904 - def _on_pad_blocked(self, pad, blocked):
905 self.log("RESET: Pad %s %s", pad, 906 (blocked and "blocked") or "unblocked")
907
908 - def _on_eater_blocked(self, pad, blocked):
909 self._on_pad_blocked(pad, blocked) 910 if blocked: 911 peer = pad.get_peer() 912 peer.send_event(gst.event_new_eos())
913 #self._unlink_pads(pad.get_parent(), [gst.PAD_SRC]) 914
915 - def _on_pipeline_drained(self):
916 self.debug('RESET: Proceed to unlink pipeline') 917 start = self.get_input_elements() 918 end = self.get_output_elements() 919 done = [] 920 for element in start: 921 element = element.get_pad('src').get_peer().get_parent() 922 self._remove_pipeline(self.pipeline, element, end, done) 923 self._rebuild_pipeline()
924 925
926 -class EncoderComponent(ParseLaunchComponent):
927 """ 928 Component that is reconfigured when new changes arrive through the 929 flumotion-reset event (sent by the fms producer). 930 """ 931 pass
932 933
934 -class MuxerComponent(MultiInputParseLaunchComponent):
935 """ 936 This class provides for multi-input ParseLaunchComponents, such as muxers, 937 that handle flumotion-reset events for reconfiguration. 938 """ 939 940 LINK_MUXER = False 941 944
945 - def configure_pipeline(self, pipeline, properties):
946 """ 947 Method not overridable by muxer subclasses. 948 """ 949 # link the muxers' sink pads when data comes in so we get compatible 950 # sink pads with input data 951 # gone are the days when we know we only have one pad template in 952 # muxers 953 self.fired_eaters = 0 954 self._probes = {} # depay element -> id 955 956 def buffer_probe_cb(a, b, depay, eaterAlias): 957 pad = depay.get_pad("src") 958 caps = pad.get_negotiated_caps() 959 if not caps: 960 return False 961 srcpad_to_link = self.get_eater_srcpad(eaterAlias) 962 muxer = self.pipeline.get_by_name("muxer") 963 self.debug("Trying to get compatible pad for pad %r with caps %s", 964 srcpad_to_link, caps) 965 linkpad = self.get_link_pad(muxer, srcpad_to_link, caps) 966 self.debug("Got link pad %r", linkpad) 967 if not linkpad: 968 m = messages.Error(T_(N_( 969 "The incoming data is not compatible with this muxer.")), 970 debug="Caps %s not compatible with this muxer." % ( 971 caps.to_string())) 972 self.addMessage(m) 973 # this is the streaming thread, cannot set state here 974 # so we do it in the mainloop 975 reactor.callLater(0, self.pipeline.set_state, gst.STATE_NULL) 976 return True 977 srcpad_to_link.link(linkpad) 978 depay.get_pad("src").remove_buffer_probe(self._probes[depay]) 979 if srcpad_to_link.is_blocked(): 980 self.is_blocked_cb(srcpad_to_link, True) 981 else: 982 srcpad_to_link.set_blocked_async(True, self.is_blocked_cb) 983 return True
984 985 for e in self.eaters: 986 depay = self.get_element(self.eaters[e].depayName) 987 self._probes[depay] = \ 988 depay.get_pad("src").add_buffer_probe( 989 buffer_probe_cb, depay, e)
990
991 - def is_blocked_cb(self, pad, is_blocked):
992 if is_blocked: 993 self.fired_eaters = self.fired_eaters + 1 994 if self.fired_eaters == len(self.eaters): 995 self.debug("All pads are now blocked") 996 self.disconnectedPads = False 997 for e in self.eaters: 998 srcpad = self.get_eater_srcpad(e) 999 srcpad.set_blocked_async(False, self.is_blocked_cb)
1000