Package flumotion :: Package component :: Package base :: Module watcher
[hide private]

Source Code for Module flumotion.component.base.watcher

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22   
 23  import os 
 24  import time 
 25   
 26  from twisted.internet import reactor 
 27   
 28  from flumotion.common import log 
 29   
30 -class BaseWatcher(log.Loggable):
31 """I watch for file changes. 32 33 I am a base class for a file watcher. I can be specialized to watch 34 any set of files. 35 """ 36
37 - def __init__(self, timeout):
38 """Make a file watcher object. 39 40 @param timeout: timeout between checks, in seconds 41 @type timeout: int 42 """ 43 self.timeout = timeout 44 self._reset() 45 self._subscribeId = 0 46 self.subscribers = {}
47
48 - def _reset(self):
49 self._stableData = {} 50 self._changingData = {} 51 self._delayedCall = None
52
53 - def _subscribe(self, **events):
54 """Subscribe to events. 55 56 @param events: The events to subscribe to. Subclasses are 57 expected to formalize this dict, specifying which events they 58 support via declaring their kwargs explicitly. 59 60 @returns: A subscription ID that can later be passed to 61 unsubscribe(). 62 """ 63 sid = self._subscribeId 64 self._subscribeId += 1 65 self.subscribers[sid] = events 66 return sid
67
68 - def subscribe(self, fileChanged=None, fileDeleted=None):
69 """Subscribe to events. 70 71 @param fileChanged: A function to call when a file changes. This 72 function will only be called if the file's details (size, mtime) 73 do not change during the timeout period. 74 @type fileChanged: filename -> None 75 @param fileDeleted: A function to call when a file is deleted. 76 @type fileDeleted: filename -> None 77 78 @returns: A subscription ID that can later be passed to 79 unsubscribe(). 80 """ 81 return self._subscribe(fileChanged=fileChanged, 82 fileDeleted=fileDeleted)
83
84 - def unsubscribe(self, id):
85 """Unsubscribe from file change notifications. 86 87 @param id: Subscription ID received from subscribe() 88 """ 89 del self.subscribers[id]
90
91 - def event(self, event, *args, **kwargs):
92 """Fire an event. 93 94 This method is intended for use by object implementations. 95 """ 96 for s in self.subscribers.values(): 97 if s[event]: 98 s[event](*args, **kwargs)
99 100 # FIXME: this API has tripped up two people thus far, including its 101 # author. make subscribe() call start() if necessary?
102 - def start(self):
103 """Start checking for file changes. 104 105 Subscribers will be notified asynchronously of changes to the 106 watched files. 107 """ 108 def checkFiles(): 109 self.log("checking for file changes") 110 new = self.getFileData() 111 changing = self._changingData 112 stable = self._stableData 113 for f in new: 114 if f not in changing: 115 if not f in stable and self.isNewFileStable(f, new[f]): 116 self.debug('file %s stable when noted', f) 117 stable[f] = new[f] 118 self.event('fileChanged', f) 119 elif f in stable and new[f] == stable[f]: 120 # no change 121 pass 122 else: 123 self.debug('change start noted for %s', f) 124 changing[f] = new[f] 125 else: 126 if new[f] == changing[f]: 127 self.debug('change finished for %s', f) 128 del changing[f] 129 stable[f] = new[f] 130 self.event('fileChanged', f) 131 else: 132 self.log('change continues for %s', f) 133 changing[f] = new[f] 134 for f in stable.keys(): 135 if f not in new: 136 # deletion 137 del stable[f] 138 self.debug('file %s has been deleted', f) 139 self.event('fileDeleted', f) 140 for f in changing.keys(): 141 if f not in new: 142 self.debug('file %s has been deleted', f) 143 del changing[f] 144 self._delayedCall = reactor.callLater(self.timeout, 145 checkFiles)
146 147 assert self._delayedCall is None 148 checkFiles()
149
150 - def stop(self):
151 """Stop checking for file changes. 152 """ 153 self._delayedCall.cancel() 154 self._reset()
155
156 - def getFileData(self):
157 """ 158 @returns: a dict, {filename => DATA} 159 DATA can be anything. In the default implementation it is a pair 160 of (mtime, size). 161 """ 162 ret = {} 163 for f in self.getFilesToStat(): 164 try: 165 stat = os.stat(f) 166 ret[f] = (stat.st_mtime, stat.st_size) 167 except OSError, e: 168 self.debug('could not read file %s: %s', f, 169 log.getExceptionMessage(e)) 170 return ret
171
172 - def isNewFileStable(self, fName, fData):
173 """ 174 Check if the file is already stable when being added to the 175 set of watched files. 176 177 @param fName: filename 178 @type fName: str 179 @param fData: DATA, as returned by L{getFileData} method. In 180 the default implementation it is a pair of 181 (mtime, size). 182 183 @rtype: bool 184 """ 185 __pychecker__ = 'unusednames=fName' 186 187 ret = fData[0] + self.timeout < time.time() 188 return ret
189
190 - def getFilesToStat(self):
191 """ 192 @returns: sequence of filename 193 """ 194 raise NotImplementedError
195
196 -class DirectoryWatcher(BaseWatcher):
197 """ 198 Directory Watcher 199 Watches a directory for new files. 200 """ 201
202 - def __init__(self, path, ignorefiles=(), timeout=30):
203 BaseWatcher.__init__(self, timeout) 204 self.path = path 205 self._ignorefiles = ignorefiles
206
207 - def getFilesToStat(self):
208 return [os.path.join(self.path, f) 209 for f in os.listdir(self.path) 210 if f not in self._ignorefiles]
211
212 -class FilesWatcher(BaseWatcher):
213 """ 214 Watches a collection of files for modifications. 215 """ 216
217 - def __init__(self, files, timeout=30):
218 BaseWatcher.__init__(self, timeout) 219 self._files = files
220
221 - def getFilesToStat(self):
222 return self._files
223