Package flumotion :: Package common :: Module planet
[hide private]

Source Code for Module flumotion.common.planet

  1  # -*- Mode: Python; -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Serializable objects from worker through manager to admin for 
 24  planet, flow, job and component. 
 25  """ 
 26   
 27  from twisted.spread import pb 
 28  from twisted.internet import defer 
 29  from zope.interface import implements 
 30   
 31  from flumotion.twisted import flavors 
 32  from flumotion.common import enum, log 
 33   
34 -class ManagerPlanetState(flavors.StateCacheable):
35 """ 36 I represent the state of a planet in the manager. 37 38 I have the following keys: 39 40 - name 41 - manager 42 - atmosphere: L{ManagerAtmosphereState} 43 - flows (list): list of L{ManagerFlowState} 44 """ 45 # FIXME: why is there a 'parent' key ?
46 - def __init__(self):
47 flavors.StateCacheable.__init__(self) 48 self.addKey('name') 49 self.addKey('parent') 50 self.addKey('manager') 51 self.addKey('atmosphere') 52 self.addListKey('flows') 53 self.addDictKey('messages') 54 55 # we always have at least one atmosphere 56 self.set('atmosphere', ManagerAtmosphereState()) 57 self.get('atmosphere').set('parent', self)
58
59 - def getComponents(self):
60 """ 61 Return a list of all component states in this planet 62 (from atmosphere and all flows). 63 64 @rtype: list of L{ManagerComponentState} 65 """ 66 list = [] 67 68 a = self.get('atmosphere') 69 if a: 70 list.extend(a.get('components')) 71 72 flows = self.get('flows') 73 if flows: 74 for flow in flows: 75 list.extend(flow.get('components')) 76 77 return list
78 79
80 -class AdminPlanetState(flavors.StateRemoteCache):
81 """ 82 I represent the state of a planet in an admin client. 83 See L{ManagerPlanetState}. 84 """
85 - def invalidate(self):
86 for flow in self.get('flows'): 87 flow.invalidate() 88 89 self.get('atmosphere').invalidate() 90 91 flavors.StateRemoteCache.invalidate(self)
92 93 pb.setUnjellyableForClass(ManagerPlanetState, AdminPlanetState) 94
95 -class ManagerAtmosphereState(flavors.StateCacheable):
96 """ 97 I represent the state of an atmosphere in the manager. 98 The atmosphere contains components that do not participate in a flow, 99 but provide services to flow components. 100 101 I have the following keys: 102 103 - name: string, "atmosphere" 104 - parent: L{ManagerPlanetState} 105 - components (list): list of L{ManagerComponentState} 106 """ 107
108 - def __init__(self):
109 flavors.StateCacheable.__init__(self) 110 self.addKey('parent') 111 self.addListKey('components') 112 self.addKey('name') 113 self.set('name', 'atmosphere')
114
115 - def empty(self):
116 """ 117 Clear out all component entries. 118 119 @returns: a DeferredList that will fire when all notifications are done. 120 """ 121 list = [self.remove('components', c) for c in self.get('components')] 122 return defer.DeferredList(list)
123
124 -class AdminAtmosphereState(flavors.StateRemoteCache):
125 """ 126 I represent the state of an atmosphere in an admin client. 127 See L{ManagerAtmosphereState}. 128 """
129 - def invalidate(self):
130 for component in self.get('components'): 131 component.invalidate() 132 133 flavors.StateRemoteCache.invalidate(self)
134 135 pb.setUnjellyableForClass(ManagerAtmosphereState, AdminAtmosphereState) 136
137 -class ManagerFlowState(flavors.StateCacheable):
138 """ 139 I represent the state of a flow in the manager. 140 141 I have the following keys: 142 143 - name: string, name of the flow 144 - parent: L{ManagerPlanetState} 145 - components (list): list of L{ManagerComponentState} 146 """
147 - def __init__(self, **kwargs):
148 """ 149 ManagerFlowState constructor. Any keyword arguments are 150 intepreted as initial key-value pairs to set on the new 151 ManagerFlowState. 152 """ 153 flavors.StateCacheable.__init__(self) 154 self.addKey('name') 155 self.addKey('parent') 156 self.addListKey('components') 157 for k, v in kwargs.items(): 158 self.set(k, v)
159
160 - def empty(self):
161 """ 162 Clear out all component entries 163 """ 164 # take a copy of the list because we're modifying while running 165 components = self.get('components')[:] 166 167 list = [self.remove('components', c) for c in components] 168 return defer.DeferredList(list)
169
170 -class AdminFlowState(flavors.StateRemoteCache):
171 """ 172 I represent the state of a flow in an admin client. 173 See L{ManagerFlowState}. 174 """
175 - def invalidate(self):
176 for component in self.get('components'): 177 component.invalidate() 178 179 flavors.StateRemoteCache.invalidate(self)
180 181 pb.setUnjellyableForClass(ManagerFlowState, AdminFlowState) 182 183 # moods 184 # FIXME. make epydoc like this 185 """ 186 @cvar moods: an enum representing the mood a component can be in. 187 """ 188 moods = enum.EnumClass( 189 'Moods', 190 ('happy', 'hungry', 'waking', 'sleeping', 'lost', 'sad') 191 ) 192 moods.can_stop = staticmethod(lambda m: m != moods.sleeping) 193 moods.can_start = staticmethod(lambda m: m == moods.sleeping) 194 195 _jobStateKeys = ['mood', 'manager-ip', 'pid', 'workerName'] 196 _jobStateListKeys = ['messages', ] 197 198 # FIXME: maybe make Atmosphere and Flow subclass from a ComponentGroup class ?
199 -class ManagerComponentState(flavors.StateCacheable):
200 """ 201 I represent the state of a component in the manager. 202 I have my own state, and also proxy state from the L{ManagerJobState} 203 when the component is actually created in a worker. 204 205 I have the following keys of my own: 206 207 - name: str, name of the component, unique in the parent 208 - parent: L{ManagerFlowState} or L{ManagerAtmosphereState} 209 - type: str, type of the component 210 - moodPending: int, the mood value the component is being set to 211 - workerRequested: str, name of the worker this component is 212 requested to be started on. 213 - config: dict, the configuration dict for this component 214 215 It also has a special key, 'mood'. This acts as a proxy for the mood 216 in the L{WorkerJobState}, when there is a job attached (the job's copy 217 is authoritative when it connects), and is controlled independently at 218 other times. 219 220 I proxy the following keys from the serialized L{WorkerJobState}: 221 - mood, manager-ip, pid, workerName 222 - messages (list) 223 """ 224
225 - def __init__(self):
226 flavors.StateCacheable.__init__(self) 227 # our additional keys 228 self.addKey('name') 229 self.addKey('type') 230 self.addKey('parent') 231 self.addKey('moodPending') 232 self.addKey('workerRequested') 233 self.addKey('config') # dictionary 234 235 # proxied from job state or combined with our state (mood) 236 for k in _jobStateKeys: 237 self.addKey(k) 238 for k in _jobStateListKeys: 239 self.addListKey(k) 240 self._jobState = None
241
242 - def __repr__(self):
243 return "<ManagerComponentState %s>" % self._dict['name']
244
245 - def setJobState(self, jobState):
246 """ 247 Set the job state I proxy from. 248 249 @type jobState: L{ManagerJobState} 250 """ 251 self._jobState = jobState 252 for key in _jobStateKeys: 253 # only set non-None values 254 if key == 'mood': 255 continue 256 v = jobState.get(key) 257 if v != None: 258 self.set(key, v) 259 for key in _jobStateListKeys: 260 list = jobState.get(key) 261 if list != None: 262 for v in list: 263 self.append(key, v) 264 # set mood last; see #552 265 self.set('mood', jobState.get('mood')) 266 267 # only proxy keys we want proxied; eaterNames and feederNames 268 # are ignored for example 269 proxiedKeys = _jobStateKeys + _jobStateListKeys 270 def proxy(attr): 271 def event(state, key, value): 272 if key in proxiedKeys: 273 getattr(self, attr)(key, value)
274 return event
275 276 jobState.addListener(self, proxy('set'), proxy('append'), 277 proxy('remove')) 278
279 - def set(self, key, value):
280 # extend set so we can log mood changes 281 if key == 'mood': 282 log.info('componentstate', 'mood of %s changed to %s', 283 self.get('name'), moods.get(value).name) 284 flavors.StateCacheable.set(self, key, value) 285 if key == 'mood' and value == self.get('moodPending'): 286 # we have reached our pending mood 287 self.set('moodPending', None)
288
289 - def setMood(self, moodValue):
290 if self._jobState and moodValue != moods.sad.value: 291 log.warning('componentstate', 'cannot set component mood to ' 292 'something other than sad when we have a ' 293 'jobState -- fix your code!') 294 elif moodValue == self.get('mood'): 295 log.log('componentstate', '%s already in mood %d', 296 self.get('name'), moodValue) 297 else: 298 log.debug('componentstate', 'manager sets mood of %s from %s to %d', 299 self.get('name'), self.get('mood'), moodValue) 300 self.set('mood', moodValue)
301
302 - def clearJobState(self, shutdownRequested):
303 """ 304 Remove the job state. 305 """ 306 # Clear messages proxied from job 307 for m in self._jobState.get('messages'): 308 self.remove('messages', m) 309 310 self._jobState.removeListener(self) 311 self._jobState = None 312 313 # Clearing a job state means that a component logged out. If we 314 # were sad, leave the mood as it is. Otherwise if shut down due 315 # to an explicit manager request, go to sleeping. Otherwise, go 316 # to lost, because it got disconnected for an unknown reason 317 # (probably network related) 318 if self.get('mood') != moods.sad.value: 319 if shutdownRequested: 320 log.debug('componentstate', "Shutdown was requested, %s" 321 " now sleeping", self.get('name')) 322 self.setMood(moods.sleeping.value) 323 else: 324 log.debug('componentstate', "Shutdown was NOT requested," 325 " %s now lost", self.get('name')) 326 self.setMood(moods.lost.value)
327
328 -class AdminComponentState(flavors.StateRemoteCache):
329 """ 330 I represent the state of a component in the admin client. 331 See L{ManagerComponentState}. 332 """
333 - def __repr__(self):
334 return "<AdminComponentState %s>" % self._dict['name']
335 336 pb.setUnjellyableForClass(ManagerComponentState, AdminComponentState) 337 338 # state of an existing component running in a job process 339 # exchanged between worker and manager
340 -class WorkerJobState(flavors.StateCacheable):
341 """ 342 I represent the state of a job in the worker, running a component. 343 344 I have the following keys: 345 346 - mood: int, value of the mood this component is in 347 - ip: string, IP address of the worker 348 - pid: int, PID of the job process 349 - workerName: string, name of the worker I'm running on 350 - messages: list of L{flumotion.common.messages.Message} 351 352 In addition, if I am the state of a FeedComponent, then I also 353 have the following keys: 354 355 - eaterNames: list of feedId being eaten by the eaters 356 - feederNames: list of feedId being fed by the feeders 357 358 @todo: change eaterNames and feederNames to eaterFeedIds and ... 359 """
360 - def __init__(self):
361 flavors.StateCacheable.__init__(self) 362 for k in _jobStateKeys: 363 self.addKey(k) 364 for k in _jobStateListKeys: 365 self.addListKey(k)
366
367 -class ManagerJobState(flavors.StateRemoteCache):
368 """ 369 I represent the state of a job in the manager. 370 See L{WorkerJobState}. 371 """ 372 pass
373 374 pb.setUnjellyableForClass(WorkerJobState, ManagerJobState) 375