1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 from flumotion.component import feedcomponent
23 from flumotion.common import errors, messages
24 from flumotion.common.planet import moods
25 from twisted.internet import defer
26 import threading
27 from flumotion.common.messages import N_
28 T_ = messages.gettexter('flumotion')
29 import gst
30
31 try:
32
33 from icalendar import Calendar
34 from dateutil import rrule
35 HAVE_ICAL = True
36 except:
37 HAVE_ICAL = False
38
45
46 -class Switch(feedcomponent.MultiInputParseLaunchComponent):
47 logCategory = 'comb-switch'
48 componentMediumClass = SwitchMedium
49
51 self.uiState.addKey("active-eater")
52 self.icalScheduler = None
53
54
55 self._idealEater = "master"
56
57
58
59
60 self._eaterReadyDefers = { "master": None, "backup": None }
61 self._started = False
62
98 d.addCallback(cb)
99 return d
100
102 raise errors.NotImplementedError('subclasses should implement '
103 'switch_to')
104
106
107 for eaterAlias in self.eaters:
108 if eaterSubstring in eaterAlias:
109 return self.eaters[eaterAlias].isActive()
110 return True
111
112
113
118
123
129
135
137 """
138 @param eaterSubstring: either "master" or "backup"
139 @param startOrStop: True if start of event, False if stop
140 """
141 if eaterSubstring != "master" and eaterSubstring != "backup":
142 self.warning("switch_to_for_event should be called with 'master'"
143 " or 'backup'")
144 return None
145 self._idealEater = eaterSubstring
146 d = defer.maybeDeferred(self.switch_to, eaterSubstring)
147 def switch_to_cb(res):
148 if not res :
149 startOrStopStr = "stopped"
150 if startOrStop:
151 startOrStopStr = "started"
152 warnStr = "Event %s but could not switch to %s" \
153 ", will switch when %s is back" % (startOrStopStr,
154 eaterSubstring, eaterSubstring)
155 self.warning(warnStr)
156 m = messages.Warning(T_(N_(warnStr)),
157 id="error-scheduling-event")
158 self.addMessage(m)
159 self._eaterReadyDefers[eaterSubstring] = defer.Deferred()
160 self._eaterReadyDefers[eaterSubstring].addCallback(
161 lambda x: self.switch_to(eaterSubstring))
162 otherEater = "backup"
163 if eaterSubstring == "backup":
164 otherEater = "master"
165 self._eaterReadyDefers[otherEater] = None
166 d.addCallback(switch_to_cb)
167 return d
168
170 logCategory = "comb-single-switch"
171
173 Switch.init(self)
174 self.switchElement = None
175
176 self.switchPads = {}
177
179 pipeline = "switch name=switch ! " \
180 "identity silent=true single-segment=true name=iden "
181
182 for eaterAlias in self.eaters:
183 pipeline += '@ eater:%s @ ! switch. ' % (eaterAlias,)
184
185 pipeline += 'iden.'
186
187 return pipeline
188
214
216 if not self.switchElement:
217 self.warning("switch_to called with eater %s but before pipeline "
218 "configured")
219 return False
220 if not eater in [ "backup", "master" ]:
221 self.warning ("%s is not master or backup", eater)
222 return False
223 if self.is_active(eater):
224 self.switchElement.set_property("active-pad",
225 self.switchPads[eater])
226 self.uiState.set("active-eater", eater)
227 return True
228 else:
229 self.warning("Could not switch to %s because the %s eater "
230 "is not active." % (eater, eater))
231 return False
232
234 Switch.eaterSetActive(self, feedId)
235 eaterName = self.get_eater_name_for_feed_id(feedId)
236 d = self._eaterReadyDefers[eaterName]
237 if d:
238 d.callback(True)
239 self._eaterReadyDefers[eaterName] = None
240
242 logCategory = "comb-av-switch"
243
245 Switch.init(self)
246 self.audioSwitchElement = None
247 self.videoSwitchElement = None
248
249 self.switchPads = {}
250 self._startTimes = {}
251 self._startTimeProbeIds = {}
252 self._padProbeLock = threading.Lock()
253 self._switchLock = threading.Lock()
254 self.pads_awaiting_block = []
255 self.padsBlockedDefer = None
256
258 d = Switch.do_check(self)
259 def checkConfig(result):
260 self.debug("checking config")
261 props = self.config['properties']
262 videoParams = {}
263 audioParams = {}
264 videoParams["video-width"] = props.get("video-width", None)
265 videoParams["video-height"] = props.get("video-height", None)
266 videoParams["video-framerate"] = props.get("video-framerate", None)
267 videoParams["video-pixel-aspect-ratio"] = props.get("video-pixel-aspect-ratio", None)
268 audioParams["audio-channels"] = props.get("audio-channels", None)
269 audioParams["audio-samplerate"] = props.get("audio-samplerate", None)
270
271 nonExistantVideoParams = []
272 existsVideoParam = False
273 allVideoParams = True
274 for p in videoParams:
275 if videoParams[p] == None:
276 allVideoParams = False
277 nonExistantVideoParams.append(p)
278 else:
279 existsVideoParam = True
280 self.debug("exists video param: %d all: %d nonexistant: %r",
281 existsVideoParam, allVideoParams, nonExistantVideoParams)
282 if not allVideoParams and existsVideoParam:
283
284 m = messages.Error(T_(N_(
285 "Video parameter(s) were specified but not all. "
286 "Missing parameters are: %r" % nonExistantVideoParams)),
287 id="video-params-not-specified")
288 self.addMessage(m)
289 nonExistantAudioParams = []
290 existsAudioParam = False
291 allAudioParams = True
292 for p in audioParams:
293 if audioParams[p] == None:
294 allAudioParams = False
295 nonExistantAudioParams.append(p)
296 else:
297 existsAudioParam = True
298 if not allAudioParams and existsAudioParam:
299
300 m = messages.Error(T_(N_(
301 "Audio parameter(s) were specified but not all. "
302 "Missing parameters are: %r" % nonExistantAudioParams)),
303 id="audio-params-not-specified")
304 self.addMessage(m)
305 return result
306 d.addCallback(checkConfig)
307 return d
308
310 eaters = self.eater_names
311 videoForceCapsTemplate = ""
312 audioForceCapsTemplate = ""
313 if properties.get("video-width", None):
314 width = properties["video-width"]
315 height = properties["video-height"]
316 par = properties["video-pixel-aspect-ratio"]
317 framerate = properties["video-framerate"]
318 videoForceCapsTemplate = \
319 "ffmpegcolorspace ! videorate ! videoscale !" \
320 " capsfilter caps=video/x-raw-yuv,width=%d,height=%d," \
321 "framerate=%d/%d,pixel-aspect-ratio=%d/%d," \
322 "format=(fourcc)I420 " \
323 "name=capsfilter-%%(eaterName)s ! " % (width,
324 height, framerate[0], framerate[1], par[0], par[1])
325 if self.config.get("audio-channels", None):
326 channels = self.config["audio-channels"]
327 samplerate = self.config["audio-samplerate"]
328 audioForceCapsTemplate = \
329 "audioconvert ! audioconvert ! capsfilter caps=" \
330 "audio/x-raw-int,channels=%d,samplerate=%d," \
331 "width=16,depth=16,signed=true " \
332 "name=capsfilter-%%(eaterName)s ! " % (
333 channels, samplerate)
334 pipeline = "switch name=vswitch ! " \
335 "identity silent=true single-segment=true name=viden " \
336 "switch name=aswitch ! " \
337 "identity silent=true single-segment=true name=aiden "
338 for eater in eaters:
339 if "video" in eater:
340 tmpl = '@ eater:%%(eaterName)s @ ! %s vswitch. ' % videoForceCapsTemplate
341 if "audio" in eater:
342 tmpl = '@ eater:%%(eaterName)s @ ! %s aswitch. ' % audioForceCapsTemplate
343 pipeline += tmpl % dict(eaterName=eater)
344
345 pipeline += 'viden. ! @feeder:video@ aiden. ! @feeder:audio@'
346 return pipeline
347
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
411 if not (self.videoSwitchElement and self.audioSwitchElement):
412 self.warning("switch_to called with eater %s but before pipeline "
413 "configured")
414 return False
415 if eater not in [ "master", "backup" ]:
416 self.warning("%s is not master or backup", eater)
417 return False
418 if self._switchLock.locked():
419 self.warning("Told to switch to %s while a current switch is going on.", eater)
420 return False
421
422 self._switchLock.acquire()
423 if self.is_active(eater) and self._startTimes == {} and \
424 self.uiState.get("active-eater") != eater:
425 self._startTimes = {"abc":None}
426 self.padsBlockedDefer = defer.Deferred()
427 self.debug("eaterSwitchingTo switching to %s", eater)
428 self.eaterSwitchingTo = eater
429 self._block_switch_sink_pads(True)
430 return self.padsBlockedDefer
431 else:
432 self._switchLock.release()
433 if self.uiState.get("active-eater") == eater:
434 self.warning("Could not switch to %s because it is already active",
435 eater)
436 elif self._startTimes == {}:
437 self.warning("Could not switch to %s because at least "
438 "one of the %s eaters is not active." % (eater, eater))
439 m = messages.Warning(T_(N_(
440 "Could not switch to %s because at least "
441 "one of the %s eaters is not active." % (eater, eater))),
442 id="cannot-switch",
443 priority=40)
444 self.state.append('messages', m)
445 else:
446 self.warning("Could not switch because startTimes is %r",
447 self._startTimes)
448 m = messages.Warning(T_(N_(
449 "Could not switch to %s because "
450 "startTimes is %r." % (eater, self._startTimes))),
451 id="cannot-switch",
452 priority=40)
453 self.state.append('messages', m)
454 return False
455
457 vswTs = self.videoSwitchElement.get_property("last-timestamp")
458 aswTs = self.audioSwitchElement.get_property("last-timestamp")
459 tsToSet = vswTs
460 if aswTs > vswTs:
461 tsToSet = aswTs
462 self.log("Setting stop-value on video switch to %u",
463 tsToSet)
464 self.log("Setting stop-value on audio switch to %u",
465 tsToSet)
466 self.videoSwitchElement.set_property("stop-value",
467 tsToSet)
468 self.audioSwitchElement.set_property("stop-value",
469 tsToSet)
470 message = None
471 if (aswTs > vswTs) and (aswTs - vswTs > gst.SECOND * 10):
472 message = "When switching to %s the other source's video" \
473 " and audio timestamps differ by %u" % (self.eaterSwitchingTo,
474 aswTs - vswTs)
475 elif (vswTs > aswTs) and (vswTs - aswTs > gst.SECOND * 10):
476 message = "When switching to %s the other source's video" \
477 " and audio timestamps differ by %u" % (self.eaterSwitchingTo,
478 vswTs - aswTs)
479 if message:
480 m = messages.Warning(T_(N_(
481 message)),
482 id="large-timestamp-difference",
483 priority=40)
484 self.state.append('messages', m)
485
487 self.log("here with pad %r and blocked %d", pad, blocked)
488 if blocked:
489 if not pad in self.pads_awaiting_block:
490 return
491 self.pads_awaiting_block.remove(pad)
492 self.log("Pads awaiting block are: %r", self.pads_awaiting_block)
493
495 if block:
496 self.pads_awaiting_block = []
497 for eaterName in self.switchPads:
498 if "audio" in eaterName:
499 pad = self.audioSwitchElement.get_pad(
500 self.switchPads[eaterName]).get_peer()
501 else:
502 pad = self.videoSwitchElement.get_pad(
503 self.switchPads[eaterName]).get_peer()
504 if pad:
505 self.pads_awaiting_block.append(pad)
506
507 for eaterName in self.switchPads:
508 if "audio" in eaterName:
509 pad = self.audioSwitchElement.get_pad(
510 self.switchPads[eaterName]).get_peer()
511 else:
512 pad = self.videoSwitchElement.get_pad(
513 self.switchPads[eaterName]).get_peer()
514 if pad:
515 self.debug("Pad: %r blocked being set to: %d", pad, block)
516 ret = pad.set_blocked_async(block, self._block_cb)
517 self.debug("Return of pad block is: %d", ret)
518 self.debug("Pad %r is blocked: %d", pad, pad.is_blocked())
519 if block:
520 self.on_pads_blocked()
521
523 eater = self.eaterSwitchingTo
524 if not eater:
525 self.warning("Eaterswitchingto is None, crying time")
526 self.log("Block callback")
527 self._set_last_timestamp()
528 self.videoSwitchElement.set_property("active-pad",
529 self.switchPads["video-%s" % eater])
530 self.audioSwitchElement.set_property("active-pad",
531 self.switchPads["audio-%s" % eater])
532 self.videoSwitchElement.set_property("queue-buffers",
533 True)
534 self.audioSwitchElement.set_property("queue-buffers",
535 True)
536 self.uiState.set("active-eater", eater)
537 self._add_pad_probes_for_start_time(eater)
538 self._block_switch_sink_pads(False)
539 if self.padsBlockedDefer:
540 self.padsBlockedDefer.callback(True)
541 else:
542 self.warning("Our pad block defer is None, inconsistency time to cry")
543 self.padsBlockedDefer = None
544
546 self.debug("adding buffer probes here for %s", activeEater)
547 for eaterName in ["video-%s" % activeEater, "audio-%s" % activeEater]:
548 if "audio" in eaterName:
549 pad = self.audioSwitchElement.get_pad(
550 self.switchPads[eaterName])
551 else:
552 pad = self.videoSwitchElement.get_pad(
553 self.switchPads[eaterName])
554 self._padProbeLock.acquire()
555 self._startTimeProbeIds[eaterName] = pad.add_buffer_probe(
556 self._start_time_buffer_probe, eaterName)
557 self._padProbeLock.release()
558
560 self.debug("start time buffer probe for %s buf ts: %u",
561 eaterName, buffer.timestamp)
562 self._padProbeLock.acquire()
563 if eaterName in self._startTimeProbeIds:
564 self._startTimes[eaterName] = buffer.timestamp
565 pad.remove_buffer_probe(self._startTimeProbeIds[eaterName])
566 del self._startTimeProbeIds[eaterName]
567 self.debug("pad probe for %s", eaterName)
568 self._check_start_times_received()
569 self._padProbeLock.release()
570 return True
571
573 self.debug("here")
574 activeEater = self.uiState.get("active-eater")
575 haveAllStartTimes = True
576 lowestTs = 0
577 for eaterName in ["video-%s" % activeEater, "audio-%s" % activeEater]:
578 haveAllStartTimes = haveAllStartTimes and \
579 (eaterName in self._startTimes)
580 if eaterName in self._startTimes and \
581 (lowestTs == 0 or self._startTimes[eaterName] < lowestTs):
582 lowestTs = self._startTimes[eaterName]
583 self.debug("lowest ts received from buffer probes: %u",
584 lowestTs)
585
586 if haveAllStartTimes:
587 self.debug("have all start times")
588 self.videoSwitchElement.set_property("start-value", lowestTs)
589 self.audioSwitchElement.set_property("start-value", lowestTs)
590 self._startTimes = {}
591
592 self.audioSwitchElement.set_property("queue-buffers", False)
593 self.videoSwitchElement.set_property("queue-buffers", False)
594 self.log("eaterSwitchingTo becoming None from %s",
595 self.eaterSwitchingTo)
596 self.eaterSwitchingTo = None
597 self._switchLock.release()
598
600 Switch.eaterSetActive(self, feedId)
601 eaterName = self.get_eater_name_for_feed_id(feedId)
602 d = None
603 if "master" in eaterName and self.is_active("master"):
604 d = self._eaterReadyDefers["master"]
605 self._eaterReadyDefers["master"] = None
606 elif "backup" in eaterName and self.is_active("backup"):
607 d = self._eaterReadyDefers["backup"]
608 self._eaterReadyDefers["backup"] = None
609 if d:
610 d.callback(True)
611