Package flumotion :: Package common :: Module worker
[hide private]

Source Code for Module flumotion.common.worker

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_common_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Objects related to the state of workers. 
 24  """ 
 25   
 26  import os 
 27  import signal 
 28   
 29  from twisted.spread import pb 
 30  from twisted.internet import protocol 
 31   
 32  from flumotion.twisted import flavors 
 33  from flumotion.common import log, errors, messages 
 34   
 35  from flumotion.common.messages import N_ 
 36  T_ = messages.gettexter('flumotion') 
 37   
38 -class ProcessProtocol(protocol.ProcessProtocol):
39 - def __init__(self, loggable, avatarId, processType, where):
40 self.loggable = loggable 41 self.avatarId = avatarId 42 self.processType = processType # e.g., 'component' 43 self.where = where # e.g., 'worker 1' 44 45 self.setPid(None)
46
47 - def setPid(self, pid):
48 self.pid = pid
49
50 - def sendMessage(self, message):
51 raise NotImplementedError
52
53 - def processEnded(self, status):
54 # vmethod implementation 55 # status is an instance of failure.Failure 56 # status.value is a twisted.internet.error.ProcessTerminated 57 # status.value.status is the os.WAIT-like status value 58 message = None 59 obj = self.loggable 60 pid = None 61 # if we have a pid, then set pid to string value of pid 62 # otherwise set to "unknown" 63 if self.pid: 64 pid = str(self.pid) 65 else: 66 pid = "unknown" 67 if status.value.exitCode is not None: 68 obj.info("Reaped child with pid %s, exit value %d.", 69 pid, status.value.exitCode) 70 signum = status.value.signal 71 72 # SIGKILL is an explicit kill, and never generates a core dump. 73 # For any other signal we want to see if there is a core dump, 74 # and warn if not. 75 if signum is not None: 76 if signum == signal.SIGKILL: 77 obj.warning("Child with pid %s killed.", pid) 78 message = messages.Error(T_(N_("The %s was killed.\n"), 79 self.processType)) 80 else: 81 message = messages.Error(T_(N_("The %s crashed.\n"), 82 self.processType), 83 debug='Terminated with signal number %d' % signum) 84 85 # use some custom logging depending on signal 86 if signum == signal.SIGSEGV: 87 obj.warning("Child with pid %s segfaulted.", pid) 88 elif signum == signal.SIGTRAP: 89 # SIGTRAP occurs when registry is corrupt 90 obj.warning("Child with pid %s received a SIGTRAP.", 91 pid) 92 else: 93 # if we find any of these, possibly special-case them too 94 obj.info("Reaped child with pid %s signaled by " 95 "signal %d.", pid, signum) 96 97 if not os.WCOREDUMP(status.value.status): 98 obj.warning("No core dump generated. " 99 "Were core dumps enabled at the start ?") 100 message.add(T_(N_( 101 "However, no core dump was generated. " 102 "You may need to configure the environment " 103 "if you want to further debug this problem."))) 104 else: 105 obj.info("Core dumped.") 106 corepath = os.path.join(os.getcwd(), 'core.%s' % pid) 107 if os.path.exists(corepath): 108 obj.info("Core file is probably '%s'." % corepath) 109 message.add(T_(N_( 110 "The core dump is '%s' on the host running '%s'."), 111 corepath, self.where)) 112 # FIXME: add an action that runs gdb and produces a 113 # backtrace; or produce it here and attach to the 114 # message as debug info. 115 116 if message: 117 obj.debug('sending message to manager/admin') 118 self.sendMessage(message) 119 120 self.setPid(None)
121
122 -class PortSet(log.Loggable):
123 """ 124 A list of ports that keeps track of which are available for use on a 125 given machine. 126 """ 127 # not very efficient mkay
128 - def __init__(self, logName, ports, randomPorts=False):
129 self.logName = logName 130 self.ports = ports 131 self.used = [0] * len(ports) 132 self.random = randomPorts
133
134 - def reservePorts(self, numPorts):
135 ret = [] 136 while numPorts > 0: 137 if self.random: 138 ret.append(0) 139 numPorts -= 1 140 continue 141 if not 0 in self.used: 142 raise errors.ComponentStartError( 143 'could not allocate port on worker %s' % self.logName) 144 i = self.used.index(0) 145 ret.append(self.ports[i]) 146 self.used[i] = 1 147 numPorts -= 1 148 return ret
149
150 - def setPortsUsed(self, ports):
151 for port in ports: 152 try: 153 i = self.ports.index(port) 154 except IndexError: 155 self.warning('portset does not include port %d', port) 156 else: 157 if self.used[i]: 158 self.warning('port %d already in use!', port) 159 else: 160 self.used[i] = 1
161
162 - def releasePorts(self, ports):
163 """ 164 @param ports: list of ports to release 165 @type ports: list of int 166 """ 167 for p in ports: 168 try: 169 i = self.ports.index(p) 170 if self.used[i]: 171 self.used[i] = 0 172 else: 173 self.warning('releasing unallocated port: %d' % p) 174 except ValueError: 175 self.warning('releasing unknown port: %d' % p)
176
177 - def numFree(self):
178 return len(self.ports) - self.numUsed()
179
180 - def numUsed(self):
181 return len(filter(None, self.used))
182 183 # worker heaven state proxy objects
184 -class ManagerWorkerHeavenState(flavors.StateCacheable):
185 """ 186 I represent the state of the worker heaven on the manager. 187 188 I have the following keys: 189 190 - names (list): list of worker names that we have state for 191 - workers (list): list of L{ManagerWorkerState} 192 """
193 - def __init__(self):
194 flavors.StateCacheable.__init__(self) 195 self.addListKey('names', []) 196 self.addListKey('workers', []) # should be a dict
197
198 - def __repr__(self):
199 return "%r" % self._dict
200
201 -class AdminWorkerHeavenState(flavors.StateRemoteCache):
202 """ 203 I represent the state of the worker heaven in the admin. 204 See L{ManagerWorkerHeavenState} 205 """ 206 pass
207 208 pb.setUnjellyableForClass(ManagerWorkerHeavenState, AdminWorkerHeavenState) 209
210 -class ManagerWorkerState(flavors.StateCacheable):
211 """ 212 I represent the state of a worker in the manager. 213 214 - name: name of the worker 215 - host: the IP address of the worker as seen by the manager 216 """
217 - def __init__(self, **kwargs):
218 flavors.StateCacheable.__init__(self) 219 self.addKey('name') 220 self.addKey('host') 221 for k, v in kwargs.items(): 222 self.set(k, v)
223
224 - def __repr__(self):
225 return ("<ManagerWorkerState for %s on %s>" 226 % (self.get('name'), self.get('host')))
227
228 -class AdminWorkerState(flavors.StateRemoteCache):
229 """ 230 I represent the state of a worker in the admin. 231 232 See L{ManagerWorkerState} 233 """ 234 pass
235 236 pb.setUnjellyableForClass(ManagerWorkerState, AdminWorkerState) 237