1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import time
23
24 import gst
25
26 from twisted.internet import reactor
27
28 from flumotion.common import componentui
29
30 __version__ = "$Rev$"
31
32
34 """
35 This class groups feeder-related information as used by a Feed Component.
36
37 @ivar feederName: name of the feeder
38 @ivar uiState: the serializable UI State for this feeder
39 """
40
42 self.feederName = feederName
43 self.elementName = 'feeder:' + feederName
44 self.payName = self.elementName + '-pay'
45 self.uiState = componentui.WorkerComponentUIState()
46 self.uiState.addKey('feederName')
47 self.uiState.set('feederName', feederName)
48 self.uiState.addListKey('clients')
49 self._fdToClient = {}
50 self._clients = {}
51
53 return ('<Feeder %s (%d client(s))>'
54 % (self.feederName, len(self._clients)))
55
57 """
58 The given client has connected on the given file descriptor, and is
59 being added to multifdsink. This is called solely from the reactor
60 thread.
61
62 @param clientId: id of the client of the feeder
63 @param fd: file descriptor representing the client
64 @param cleanup: callable to be called when the given fd is removed
65 """
66 if clientId not in self._clients:
67
68 client = FeederClient(clientId)
69 self._clients[clientId] = client
70 self.uiState.append('clients', client.uiState)
71
72 client = self._clients[clientId]
73 self._fdToClient[fd] = (client, cleanup)
74
75 client.connected(fd)
76
77 return client
78
80 """
81 The client has been entirely removed from multifdsink, and we may
82 now close its file descriptor.
83 The client object stays around so we can track over multiple
84 connections.
85
86 Called from GStreamer threads.
87
88 @type fd: file descriptor
89 """
90 (client, cleanup) = self._fdToClient.pop(fd)
91 client.disconnected(fd=fd)
92
93
94
95
96 reactor.callFromThread(cleanup, fd)
97
99 """
100 @rtype: list of all L{FeederClient}s ever seen, including currently
101 disconnected clients
102 """
103 return self._clients.values()
104
105
107 """
108 This class groups information related to the client of a feeder.
109 The client is identified by an id.
110 The information remains valid for the lifetime of the feeder, so it
111 can track reconnects of the client.
112
113 @ivar clientId: id of the client of the feeder
114 @ivar fd: file descriptor the client is currently using, or None.
115 """
116
118 self.uiState = componentui.WorkerComponentUIState()
119 self.uiState.addKey('client-id', clientId)
120 self.fd = None
121 self.uiState.addKey('fd', None)
122
123
124
125
126 for key in (
127 'bytes-read-current',
128 'bytes-read-total',
129 'reconnects',
130 'last-connect',
131 'last-disconnect',
132 'last-activity',
133 ):
134 self.uiState.addKey(key, 0)
135
136 for key in (
137 'buffers-dropped-current',
138 'buffers-dropped-total',
139 ):
140 self.uiState.addKey(key, None)
141
142
143 self._buffersDroppedBefore = 0
144 self._bytesReadBefore = 0
145
147 """
148 @type stats: list
149 """
150 bytesSent = stats[0]
151
152
153
154 timeLastActivity = float(stats[4]) / gst.SECOND
155 if len(stats) > 5:
156
157 buffersDropped = stats[5]
158 else:
159
160
161 buffersDropped = 0
162
163 self.uiState.set('bytes-read-current', bytesSent)
164 self.uiState.set('buffers-dropped-current', buffersDropped)
165 self.uiState.set('bytes-read-total', self._bytesReadBefore + bytesSent)
166 self.uiState.set('last-activity', timeLastActivity)
167 if buffersDropped is not None:
168 self.uiState.set('buffers-dropped-total',
169 self._buffersDroppedBefore + buffersDropped)
170
172 """
173 The client has connected on this fd.
174 Update related stats.
175
176 Called only from the reactor thread.
177 """
178 if not when:
179 when = time.time()
180
181 if self.fd:
182
183
184
185 self._updateUIStateForDisconnect(self.fd, when)
186
187 self.fd = fd
188 self.uiState.set('fd', fd)
189 self.uiState.set('last-connect', when)
190 self.uiState.set('reconnects', self.uiState.get('reconnects', 0) + 1)
191
193 if self.fd == fd:
194 self.fd = None
195 self.uiState.set('fd', None)
196 self.uiState.set('last-disconnect', when)
197
198
199 self._bytesReadBefore += self.uiState.get('bytes-read-current')
200 self.uiState.set('bytes-read-current', 0)
201 if self.uiState.get('buffers-dropped-current') is not None:
202 self._buffersDroppedBefore += self.uiState.get(
203 'buffers-dropped-current')
204 self.uiState.set('buffers-dropped-current', 0)
205
207 """
208 The client has disconnected.
209 Update related stats.
210
211 Called from GStreamer threads.
212 """
213 if self.fd != fd:
214
215
216 return
217
218 if not when:
219 when = time.time()
220
221 reactor.callFromThread(self._updateUIStateForDisconnect, fd,
222 when)
223