Trees | Indices | Help |
---|
|
1 # -*- Mode: Python -*- 2 # vi:si:et:sw=4:sts=4:ts=4 3 # 4 # Flumotion - a streaming media server 5 # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 6 # All rights reserved. 7 8 # This file may be distributed and/or modified under the terms of 9 # the GNU General Public License version 2 as published by 10 # the Free Software Foundation. 11 # This file is distributed without any warranty; without even the implied 12 # warranty of merchantability or fitness for a particular purpose. 13 # See "LICENSE.GPL" in the source distribution for more information. 14 15 # Licensees having purchased or holding a valid Flumotion Advanced 16 # Streaming Server license may use this file in accordance with the 17 # Flumotion Advanced Streaming Server Commercial License Agreement. 18 # See "LICENSE.Flumotion" in the source distribution for more information. 19 20 # Headers in this file shall remain intact. 21 22 from flumotion.common import log 23 from flumotion.extern.fdpass import fdpass 24 25 from twisted.internet import unix, main, address, tcp 26 from twisted.spread import pb 27 28 import errno 29 import os 30 import socket 31 import struct 32 33 # Heavily based on 34 # http://twistedmatrix.com/trac/browser/sandbox/exarkun/copyover/server.py 35 # and client.py 36 # Thanks for the inspiration! 37 38 # Since we're doing this over a stream socket, our file descriptor messages 39 # aren't guaranteed to be received alone; they could arrive along with some 40 # unrelated data. 41 # So, we prefix the message with a 16 byte magic signature, and a length, 42 # and if we receive file descriptors decode based on this. 43 # 44 # map() instead of a string to workaround gettext encoding problems. 45 # 46 MAGIC_SIGNATURE = ''.join(map(chr, [253, 252, 142, 127, 7, 71, 185, 234, 47 161, 117, 238, 216, 220, 54, 200, 163])) 48 5355 transport = FDServer56 5860 if not self.connected: 61 return 62 try: 63 (fds, message) = fdpass.readfds(self.fileno(), 64 * 1024) 64 except socket.error, se: 65 if se.args[0] == errno.EWOULDBLOCK: 66 return 67 else: 68 return main.CONNECTION_LOST 69 else: 70 if not message: 71 return main.CONNECTION_DONE 72 73 if len(fds) > 0: 74 # Look for our magic cookie in (possibly) the midst of other 75 # data. Pass surrounding chunks, if any, onto dataReceived(), 76 # which (undocumentedly) must return None unless a failure 77 # occurred. 78 # Pass the actual FDs and their message to 79 # fileDescriptorsReceived() 80 offset = message.find(MAGIC_SIGNATURE) 81 if offset < 0: 82 # Old servers did not send this; be hopeful that this 83 # doesn't have bits of other protocol (i.e. PB) mixed up 84 # in it. 85 return self.protocol.fileDescriptorsReceived(fds, message) 86 elif offset > 0: 87 ret = self.protocol.dataReceived(message[0:offset]) 88 if ret: 89 return ret 90 91 msglen = struct.unpack("@I", message[offset+16:offset+20])[0] 92 offset += 20 93 ret = self.protocol.fileDescriptorsReceived(fds, 94 message[offset:offset+msglen]) 95 if ret: 96 return ret 97 98 if offset+msglen < len(message): 99 return self.protocol.dataReceived(message[offset+msglen:]) 100 return ret 101 else: 102 # self.debug("No FDs, passing to dataReceived") 103 return self.protocol.dataReceived(message)104 108110 """ 111 A pb.Broker subclass that handles FDs being passed to it (with associated 112 data) over the same connection as the normal PB data stream. 113 When an FD is seen, it creates new protocol objects for them from the 114 childFactory attribute. 115 """ 116 # FIXME: looks like we can only use our own subclasses that take 117 # three __init__ args160119 """ 120 @param connectionClass: a subclass of L{twisted.internet.tcp.Connection} 121 """ 122 pb.Broker.__init__(self, **kwargs) 123 124 self.childFactory = childFactory 125 self._connectionClass = connectionClass126 127 # This is the complex bit. If our underlying transport receives a file 128 # descriptor, this gets called - along with the data we got with the FD. 129 # We create an appropriate protocol object, and attach it to the reactor.131 if len(fds) == 1: 132 fd = fds[0] 133 134 # Note that we hardcode IPv4 here! 135 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 136 137 self.debug("Received FD %d->%d" % (fd, sock.fileno())) 138 139 # Undocumentedly (other than a comment in 140 # Python/Modules/socketmodule.c), socket.fromfd() calls dup() on 141 # the passed FD before it actually wraps it in a socket object. 142 # So, we need to close the FD that we originally had... 143 os.close(fd) 144 145 try: 146 peeraddr = sock.getpeername() 147 except socket.error: 148 self.info("Socket disconnected before being passed to client") 149 sock.close() 150 return 151 152 # Based on bits in tcp.Port.doRead() 153 addr = address._ServerFactoryIPv4Address('TCP', 154 peeraddr[0], peeraddr[1]) 155 protocol = self.childFactory.buildProtocol(addr) 156 157 self._connectionClass(sock, protocol, peeraddr, message) 158 else: 159 self.warning("Unexpected: FD-passing message with len(fds) != 1")162 keepSocketAlive = False 163176165 # We override this (from tcp._SocketCloser) so that we can close sockets 166 # properly in the normal case, but once we've passed our socket on via 167 # the FD-channel, we just close() it (not calling shutdown() which will 168 # close the TCP channel without closing the FD itself) 169 if self.keepSocketAlive: 170 try: 171 self.socket.close() 172 except socket.error: 173 pass 174 else: 175 tcp.Server._closeSocket(self)178 """ 179 A subclass of tcp.Server that permits passing the FDs used to other 180 processes (by just calling close(2) rather than shutdown(2) on them) 181 """ 182 pass183185 transport = PassableServerConnection186
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Thu Aug 7 15:45:41 2008 | http://epydoc.sourceforge.net |