1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import errno
23 import os
24 import time
25 import tempfile
26 import datetime as dt
27
28 import gobject
29 import gst
30
31 from twisted.internet import reactor
32
33 from flumotion.component import feedcomponent
34 from flumotion.common import log, gstreamer, pygobject, messages, errors
35 from flumotion.common import documentation, format
36 from flumotion.common import eventcalendar, poller
37 from flumotion.common.i18n import N_, gettexter
38 from flumotion.common.mimetypes import mimeTypeToExtention
39 from flumotion.common.pygobject import gsignal
40
41
42
43
44
45
46
47
48
49 from flumotion.component.component import moods
50
51 __all__ = ['Disker']
52 __version__ = "$Rev$"
53 T_ = gettexter()
54
55
56 DISKPOLL_FREQ = 60
57
58
59 FILELIST_SIZE = 100
60
61 """
62 Disker has a property 'ical-schedule'. This allows an ical file to be
63 specified in the config and have recordings scheduled based on events.
64 This file will be monitored for changes and events reloaded if this
65 happens.
66
67 The filename of a recording started from an ical file will be produced
68 via passing the ical event summary through strftime, so that an archive
69 can encode the date and time that it was begun.
70
71 The time that will be given to strftime will be given in the timezone of
72 the ical event. In practice this will either be UTC or the local time of
73 the machine running the disker, as the ical scheduler does not
74 understand arbitrary timezones.
75 """
76
77
79
80
81
84
85
86
87
90
100
101
102
105
106
107 -class Disker(feedcomponent.ParseLaunchComponent, log.Loggable):
108 componentMediumClass = DiskerMedium
109 checkOffset = True
110 pipe_template = 'multifdsink sync-method=1 name=fdsink mode=1 sync=false'
111 file = None
112 directory = None
113 location = None
114 caps = None
115 last_tstamp = None
116
117 _startFilenameTemplate = None
118 _startTime = None
119 _rotateTimeDelayedCall = None
120 _pollDiskDC = None
121 _symlinkToLastRecording = None
122 _symlinkToCurrentRecording = None
123
124
125
126
127
128
129
146
147
148
162
174
176 props = self.config['properties']
177 rotateType = props.get('rotate-type', 'none')
178
179 if not rotateType in ['none', 'size', 'time']:
180 msg = messages.Error(T_(N_(
181 "The configuration property 'rotate-type' should be set to "
182 "'size', time', or 'none', not '%s'. "
183 "Please fix the configuration."),
184 rotateType), mid='rotate-type')
185 addMessage(msg)
186 raise errors.ConfigError(msg)
187
188 if rotateType in ['size', 'time']:
189 if rotateType not in props.keys():
190 msg = messages.Error(T_(N_(
191 "The configuration property '%s' should be set. "
192 "Please fix the configuration."),
193 rotateType), mid='rotate-type')
194 addMessage(msg)
195 raise errors.ConfigError(msg)
196
197 if props[rotateType] == 0:
198 msg = messages.Error(T_(N_("Configuration error: " \
199 "'rotate-type' %s value cannot be set to 0."),
200 rotateType), mid='rotate-type')
201 addMessage(msg)
202 raise errors.ConfigError(msg)
203
204
205
231
256
257 if not eventcalendar.HAS_ICALENDAR:
258 missingModule('icalendar')
259 if not eventcalendar.HAS_DATEUTIL:
260 missingModule('dateutil')
261
262 raise errors.ComponentSetupHandledError()
263
264 sink = self.get_element('fdsink')
265
266 if gstreamer.element_factory_has_property('multifdsink',
267 'resend-streamheader'):
268 sink.set_property('resend-streamheader', False)
269 else:
270 self.debug("resend-streamheader property not available, "
271 "resending streamheader when it changes in the caps")
272 sink.get_pad('sink').connect('notify::caps', self._notify_caps_cb)
273
274 sink.connect('client-removed', self._client_removed_cb)
275
276
277 react_to_marks = properties.get('react-to-stream-markers', False)
278 if react_to_marks:
279 pfx = properties.get('stream-marker-filename-prefix', '%03d.')
280 self._markerPrefix = pfx
281 sink.get_pad('sink').add_event_probe(self._markers_event_probe)
282
283
284
286
287
288 self._pollDiskDC = None
289 s = None
290 try:
291 s = os.statvfs(self.directory)
292 except Exception, e:
293 self.debug('failed to figure out disk space: %s',
294 log.getExceptionMessage(e))
295
296 if not s:
297 free = None
298 else:
299 free = format.formatStorage(s.f_frsize * s.f_bavail)
300
301 if self.uiState.get('disk-free') != free:
302 self.debug("disk usage changed, reporting to observers")
303 self.uiState.set('disk-free', free)
304
313
319
326
336
338 if self.caps:
339 return self.caps.get_structure(0).get_name()
340
341
342
344 mime = self.getMime()
345 if mime == 'multipart/x-mixed-replace':
346 mime += ";boundary=ThisRandomString"
347 return mime
348
380
382 """
383 @param filenameTemplate: strftime format string to decide filename
384 @param time: an aware datetime used for the filename and
385 to compare if an existing file needs to be
386 overwritten. defaulting to datetime.now().
387 """
388 mime = self.getMime()
389 ext = mimeTypeToExtention(mime)
390
391
392
393
394
395 tm = datetime or dt.datetime.now()
396 tmutc = datetime or dt.datetime.utcnow()
397
398 self.stopRecording()
399
400 sink = self.get_element('fdsink')
401 if sink.get_state() == gst.STATE_NULL:
402 sink.set_state(gst.STATE_READY)
403
404 filename = ""
405 if not filenameTemplate:
406 filenameTemplate = self._defaultFilenameTemplate
407 filename = "%s.%s" % (format.strftime(filenameTemplate,
408
409 tm.timetuple()), ext)
410 self.location = os.path.join(self.directory, filename)
411
412
413
414 location = self.location
415 i = 1
416 while os.path.exists(location):
417 mtimeTuple = time.gmtime(os.stat(location).st_mtime)
418
419
420 if mtimeTuple <= tmutc.utctimetuple():
421 self.info(
422 "Existing recording %s from previous event, overwriting",
423 location)
424 break
425
426 self.info(
427 "Existing recording %s from current event, changing name",
428 location)
429 location = self.location + '.' + str(i)
430 i += 1
431 self.location = location
432
433 self.info("Changing filename to %s", self.location)
434 try:
435 self.file = open(self.location, 'wb')
436 except IOError, e:
437 self.warning("Failed to open output file %s: %s",
438 self.location, log.getExceptionMessage(e))
439 m = messages.Error(T_(N_(
440 "Failed to open output file '%s' for writing. "
441 "Check permissions on the file."), self.location))
442 self.addMessage(m)
443 return
444 self._recordingStarted(self.file, self.location)
445 sink.emit('add', self.file.fileno())
446 self.last_tstamp = time.time()
447 self.uiState.set('filename', self.location)
448 self.uiState.set('recording', True)
449
450 if self._symlinkToCurrentRecording:
451 self._updateSymlink(self.location,
452 self._symlinkToCurrentRecording)
453
455 if not dest.startswith('/'):
456 dest = os.path.join(self.directory, dest)
457
458
459
460 self.debug("updating symbolic link %s to point to %s", dest, src)
461 try:
462 try:
463 os.symlink(src, dest)
464 except OSError, e:
465 if e.errno == errno.EEXIST and os.path.islink(dest):
466 os.unlink(dest)
467 os.symlink(src, dest)
468 else:
469 raise
470 except Exception, e:
471 self.info("Failed to update link %s: %s", dest,
472 log.getExceptionMessage(e))
473 m = messages.Warning(T_(N_("Failed to update symbolic link "
474 "'%s'. Check your permissions."), dest),
475 debug=log.getExceptionMessage(e))
476 self.addMessage(m)
477
479 sink = self.get_element('fdsink')
480 if sink.get_state() == gst.STATE_NULL:
481 sink.set_state(gst.STATE_READY)
482
483 if self.file:
484 self.file.flush()
485 sink.emit('remove', self.file.fileno())
486 self._recordingStopped(self.file, self.location)
487 self.file = None
488 self.uiState.set('filename', None)
489 self.uiState.set('recording', False)
490 try:
491 size = format.formatStorage(os.stat(self.location).st_size)
492 except EnvironmentError, e:
493
494 size = "unknown"
495
496
497 fl = self.uiState.get('filelist', otherwise=[])
498 if FILELIST_SIZE == len(fl):
499 self.uiState.remove('filelist', fl[0])
500
501 self.uiState.append('filelist', (self.last_tstamp,
502 self.location,
503 size))
504
505 if self._symlinkToLastRecording:
506 self._updateSymlink(self.location,
507 self._symlinkToLastRecording)
508
528
529
530
532
533
534 if client_status == 4:
535
536
537 reactor.callFromThread(self._client_error_cb)
538
549
555
560
562
563
564
565 current = self.uiState.get('next-points')[:]
566 points = self.icalScheduler.getPoints()
567 new = []
568
569
570
571
572 def _utcAndStripTZ(dt):
573 from flumotion.common import eventcalendar
574 return dt.astimezone(eventcalendar.UTC).replace(tzinfo=None)
575
576 for p in points:
577 dtUTC = _utcAndStripTZ(p.dt)
578 dtStart = p.eventInstance.start.replace(tzinfo=None)
579 new.append((dtUTC, p.which,
580 format.strftime(p.eventInstance.event.content,
581 dtStart.timetuple())))
582
583 for t in current:
584 if t not in new:
585 self.debug('removing tuple %r from next-points', t)
586 self.uiState.remove('next-points', t)
587
588 for t in new:
589 if t not in current:
590 self.debug('appending tuple %r to next-points', t)
591 self.uiState.append('next-points', t)
592
594 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug'
595
596 if socket not in self.plugs:
597 return
598 for plug in self.plugs[socket]:
599 self.debug('invoking recordingStarted on '
600 'plug %r on socket %s', plug, socket)
601 plug.recordingStarted(file, location)
602
604 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug'
605
606 if socket not in self.plugs:
607 return
608 for plug in self.plugs[socket]:
609 self.debug('invoking recordingStopped on '
610 'plug %r on socket %s', plug, socket)
611 plug.recordingStopped(file, location)
612
613
614
616 if event.type == gst.EVENT_CUSTOM_DOWNSTREAM:
617 evt_struct = event.get_structure()
618 if evt_struct.get_name() == 'FluStreamMark':
619 if evt_struct['action'] == 'start':
620 self._onMarkerStart(evt_struct['prog_id'])
621 elif evt_struct['action'] == 'stop':
622 self._onMarkerStop()
623 return True
624
627
629 tmpl = self._defaultFilenameTemplate
630 if self._markerPrefix:
631 try:
632 tmpl = '%s%s' % (self._markerPrefix % data,
633 self._defaultFilenameTemplate)
634 except TypeError, err:
635 m = messages.Warning(T_(N_('Failed expanding filename prefix: '
636 '%r <-- %r.'),
637 self._markerPrefix, data),
638 mid='expand-marker-prefix')
639 self.addMessage(m)
640 self.warning('Failed expanding filename prefix: '
641 '%r <-- %r; %r' %
642 (self._markerPrefix, data, err))
643 self.changeFilename(tmpl)
644
650