Package flumotion :: Package common :: Module common
[hide private]

Source Code for Module flumotion.common.common

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_common -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  small common functions used by all processes 
 24  """ 
 25   
 26  import errno 
 27  import os 
 28  import sys 
 29  import time 
 30  import signal 
 31  import locale 
 32   
 33  from twisted.internet import address 
 34   
 35  from flumotion.common import log 
 36   
 37  # Note: This module is loaded very early on, so 
 38  #       don't add any extra flumotion imports unless you 
 39  #       really know what you're doing. 
 40  from flumotion.configure import configure 
 41   
42 -def formatStorage(units, precision=2):
43 """ 44 Nicely formats a storage size using SI units. 45 See Wikipedia and other sources for rationale. 46 Prefixes are k, M, G, ... 47 Sizes are powers of 10. 48 Actual result should be suffixed with bit or byte, not b or B. 49 50 @param units: the unit size to format 51 @type units: int or float 52 @param precision: the number of floating point digits to use 53 @type precision: int 54 55 @rtype: string 56 @returns: value of units, formatted using SI scale and the given precision 57 """ 58 59 # XXX: We might end up calling float(), which breaks 60 # when using LC_NUMERIC when it is not C -- only in python 61 # 2.3 though, no prob in 2.4. See PEP 331 62 if sys.version_info < (2, 4): 63 locale.setlocale(locale.LC_NUMERIC, "C") 64 65 prefixes = ['E', 'P', 'T', 'G', 'M', 'k', ''] 66 67 value = float(units) 68 prefix = prefixes.pop() 69 while prefixes and value >= 1000: 70 prefix = prefixes.pop() 71 value /= 1000 72 73 format = "%%.%df %%s" % precision 74 return format % (value, prefix)
75
76 -def formatTime(seconds, fractional=0):
77 """ 78 Nicely format time in a human-readable format. 79 Will chunks weeks, days, hours and minutes. 80 81 @param seconds: the time in seconds to format. 82 @type seconds: int or float 83 @param fractional: how many digits to show for the fractional part. 84 @type fractional: int 85 86 @rtype: string 87 @returns: a nicely formatted time string. 88 """ 89 chunks = [] 90 91 week = 60 * 60 * 24 * 7 92 weeks = seconds / week 93 seconds %= week 94 95 day = 60 * 60 * 24 96 days = seconds / day 97 seconds %= day 98 99 hour = 60 * 60 100 hours = seconds / hour 101 seconds %= hour 102 103 minute = 60 104 minutes = seconds / minute 105 seconds %= minute 106 107 if weeks > 1: 108 chunks.append('%d weeks' % weeks) 109 elif weeks == 1: 110 chunks.append('1 week') 111 112 if days > 1: 113 chunks.append('%d days' % days) 114 elif days == 1: 115 chunks.append('1 day') 116 117 chunk = '%02d:%02d' % (hours, minutes) 118 if fractional > 0: 119 chunk += ':%0*.*f' % (fractional + 3, fractional, seconds) 120 121 chunks.append(chunk) 122 123 124 return " ".join(chunks)
125
126 -def formatTimeStamp(timeOrTuple):
127 """ 128 Format a timestamp in a human-readable format. 129 130 @param timeOrTuple: the timestamp to format 131 @type timeOrTuple: something that time.strftime will accept 132 133 @rtype: string 134 @returns: a nicely formatted timestamp string. 135 """ 136 return time.strftime("%Y-%m-%d %H:%M %Z", timeOrTuple)
137
138 -def version(binary):
139 """ 140 Print a version block for the flumotion binaries. 141 142 @arg binary: name of the binary 143 @type binary: string 144 """ 145 146 block = [] 147 block.append("%s %s" % (binary, configure.version)) 148 block.append("part of Flumotion - a streaming media server") 149 block.append("(C) Copyright 2004,2005,2006,2007 Fluendo") 150 return "\n".join(block)
151
152 -def mergeImplements(*classes):
153 """ 154 Merge the __implements__ tuples of the given classes into one tuple. 155 """ 156 allYourBase = () 157 for clazz in classes: 158 try: 159 interfaces = [i for i in clazz.__implemented__] 160 except AttributeError: 161 # with twisted 2.0.1, we get AttributeError with a simple 162 # class C: pass 163 # which does not have C.__implemented__ 164 interfaces = [] 165 for interface in interfaces: 166 allYourBase += (interface,) 167 return allYourBase
168
169 -def daemonize(stdin='/dev/null', stdout='/dev/null', stderr='/dev/null', 170 directory='/'):
171 ''' 172 This forks the current process into a daemon. 173 The stdin, stdout, and stderr arguments are file names that 174 will be opened and be used to replace the standard file descriptors 175 in sys.stdin, sys.stdout, and sys.stderr. 176 These arguments are optional and default to /dev/null. 177 178 The fork will switch to the given directory. 179 ''' 180 # Redirect standard file descriptors. 181 si = open(stdin, 'r') 182 os.dup2(si.fileno(), sys.stdin.fileno()) 183 try: 184 log.outputToFiles(stdout, stderr) 185 except IOError, e: 186 if e.errno == errno.EACCES: 187 log.error('common', 'Permission denied writing to log file %s.', 188 e.filename) 189 190 # first fork 191 try: 192 pid = os.fork() 193 if pid > 0: 194 sys.exit(0) # exit first parent 195 except OSError, e: 196 sys.stderr.write("Failed to fork: (%d) %s\n" % (e.errno, e.strerror)) 197 sys.exit(1) 198 199 # decouple from parent environment 200 try: 201 os.chdir(directory) 202 except OSError, e: 203 from flumotion.common import errors 204 raise errors.SystemError, "Failed to change directory to %s: %s" % ( 205 directory, e.strerror) 206 os.umask(0) 207 os.setsid() 208 209 # do second fork 210 try: 211 pid = os.fork() 212 if pid > 0: 213 sys.exit(0) # exit second parent 214 except OSError, e: 215 sys.stderr.write("Failed to fork: (%d) %s\n" % (e.errno, e.strerror)) 216 sys.exit(1)
217 218 # Now I am a daemon! 219 # don't add stuff here that can fail, because from now on the program 220 # will keep running regardless of tracebacks 221
222 -def startup(processType, processName, daemonize=False, daemonizeTo='/'):
223 """ 224 Prepare a process for starting, logging appropriate standarised messages. 225 First daemonizes the process, if daemonize is true. 226 """ 227 log.info(processType, "Starting %s '%s'", processType, processName) 228 229 if daemonize: 230 daemonizeHelper(processType, daemonizeTo, processName) 231 232 log.info(processType, "Started %s '%s'", processType, processName) 233 234 def shutdownStarted(): 235 log.info(processType, "Stopping %s '%s'", processType, processName)
236 def shutdownEnded(): 237 log.info(processType, "Stopped %s '%s'", processType, processName) 238 239 # import inside function so we avoid affecting startup 240 from twisted.internet import reactor 241 reactor.addSystemEventTrigger('before', 'shutdown', 242 shutdownStarted) 243 reactor.addSystemEventTrigger('after', 'shutdown', 244 shutdownEnded) 245
246 -def daemonizeHelper(processType, daemonizeTo='/', processName=None):
247 """ 248 Daemonize a process, writing log files and PID files to conventional 249 locations. 250 251 @param processType: The process type, for example 'worker'. Used 252 as part of the log file and PID file names. 253 @type processType: str 254 @param daemonizeTo: The directory that the daemon should run in. 255 @type daemonizeTo: str 256 @param processName: The service name of the process. Used to 257 disambiguate different instances of the same daemon. 258 @type processName: str 259 """ 260 261 ensureDir(configure.logdir, "log file") 262 ensureDir(configure.rundir, "run file") 263 264 pid = getPid(processType, processName) 265 if pid: 266 raise SystemError( 267 "A %s service named '%s' is already running with pid %d" 268 % (processType, processName or processType, pid)) 269 270 log.debug(processType, "%s service named '%s' daemonizing", 271 processType, processName) 272 273 if processName: 274 logPath = os.path.join(configure.logdir, 275 '%s.%s.log' % (processType, processName)) 276 else: 277 logPath = os.path.join(configure.logdir, 278 '%s.log' % (processType,)) 279 log.debug(processType, 'Further logging will be done to %s', logPath) 280 281 file = _acquirePidFile(processType, processName) 282 283 # here we daemonize; so we also change our pid 284 daemonize(stdout=logPath, stderr=logPath, directory=daemonizeTo) 285 286 log.debug(processType, 'Started daemon') 287 288 # from now on I should keep running until killed, whatever happens 289 path = writePidFile(processType, processName, file=file) 290 log.debug(processType, 'written pid file %s', path) 291 292 # import inside function so we avoid affecting startup 293 from twisted.internet import reactor 294 def _deletePidFile(): 295 log.debug(processType, 'deleting pid file') 296 deletePidFile(processType, processName)
297 reactor.addSystemEventTrigger('after', 'shutdown', 298 _deletePidFile) 299 300
301 -def argRepr(args=(), kwargs={}, max=-1):
302 """ 303 Return a string representing the given args. 304 """ 305 # FIXME: rename function 306 # FIXME: implement max 307 308 # check validity of input 309 assert (type(args) is tuple or 310 type(args) is list) 311 assert type(kwargs) is dict 312 313 s = '' 314 args = list(args) 315 316 if args: 317 args = map(repr, args) 318 s += ', '.join(args) 319 320 if kwargs: 321 r = [(key + '=' + repr(item)) 322 for key, item in kwargs.items()] 323 324 if s: 325 s += ', ' 326 s += ', '.join(r) 327 328 return s
329
330 -def ensureDir(dir, description):
331 """ 332 Ensure the given directory exists, creating it if not. 333 Raises a SystemError if this fails, including the given description. 334 """ 335 if not os.path.exists(dir): 336 try: 337 os.makedirs(dir) 338 except: 339 from flumotion.common import errors 340 raise errors.SystemError, "could not create %s directory %s" % ( 341 description, dir)
342
343 -def getPidPath(type, name=None):
344 """ 345 Get the full path to the pid file for the given process type and name. 346 """ 347 path = os.path.join(configure.rundir, '%s.pid' % type) 348 if name: 349 path = os.path.join(configure.rundir, '%s.%s.pid' % (type, name)) 350 log.debug('common', 'getPidPath for type %s, name %r: %s' % ( 351 type, name, path)) 352 return path
353
354 -def writePidFile(type, name=None, file=None):
355 """ 356 Write a pid file in the run directory, using the given process type 357 and process name for the filename. 358 359 @rtype: str 360 @returns: full path to the pid file that was written 361 """ 362 pid = os.getpid() 363 if not file: 364 ensureDir(configure.rundir, "rundir") 365 path = getPidPath(type, name) 366 file = open(path, 'w') 367 file.write("%d\n" % pid) 368 file.close() 369 return path 370 else: 371 file.write("%d\n" % pid) 372 file.close() 373 return file.name
374
375 -def _acquirePidFile(type, name=None):
376 """ 377 Open a PID file for writing, using the given process type and 378 process name for the filename. The returned file can be then passed 379 to writePidFile after forking. 380 381 @rtype: str 382 @returns: file object, open for writing 383 """ 384 ensureDir(configure.rundir, "rundir") 385 path = getPidPath(type, name) 386 return open(path, 'w')
387
388 -def deletePidFile(type, name=None):
389 """ 390 Delete the pid file in the run directory, using the given process type 391 and process name for the filename. 392 393 @rtype: str 394 @returns: full path to the pid file that was written 395 """ 396 path = getPidPath(type, name) 397 os.unlink(path) 398 return path
399
400 -def getPid(type, name=None):
401 """ 402 Get the pid from the pid file in the run directory, using the given 403 process type and process name for the filename. 404 405 @returns: pid of the process, or None if not running or file not found. 406 """ 407 408 pidPath = getPidPath(type, name) 409 log.log('common', 'pidfile for %s %s is %s' % (type, name, pidPath)) 410 if not os.path.exists(pidPath): 411 return 412 413 file = open(pidPath, 'r') 414 pid = file.readline() 415 file.close() 416 if not pid or int(pid) == 0: 417 return 418 419 return int(pid)
420
421 -def signalPid(pid, signum):
422 """ 423 Send the given process a signal. 424 425 @returns: whether or not the process with the given pid was running 426 """ 427 try: 428 os.kill(pid, signum) 429 return True 430 except OSError, e: 431 if not e.errno == errno.ESRCH: 432 # FIXME: unhandled error, maybe give some better info ? 433 raise 434 return False
435
436 -def termPid(pid):
437 """ 438 Send the given process a TERM signal. 439 440 @returns: whether or not the process with the given pid was running 441 """ 442 return signalPid(pid, signal.SIGTERM)
443
444 -def killPid(pid):
445 """ 446 Send the given process a KILL signal. 447 448 @returns: whether or not the process with the given pid was running 449 """ 450 return signalPid(pid, signal.SIGKILL)
451
452 -def checkPidRunning(pid):
453 """ 454 Check if the given pid is currently running. 455 456 @returns: whether or not a process with that pid is active. 457 """ 458 return signalPid(pid, 0)
459
460 -def waitPidFile(type, name=None):
461 """ 462 Wait for the given process type and name to have started and created 463 a pid file. 464 465 Return the pid. 466 """ 467 # getting it from the start avoids an unneeded time.sleep 468 pid = getPid(type, name) 469 470 while not pid: 471 time.sleep(0.1) 472 pid = getPid(type, name) 473 474 return pid
475
476 -def waitForTerm():
477 """ 478 Wait until we get killed by a TERM signal (from someone else). 479 """ 480 481 class Waiter: 482 def __init__(self): 483 self.sleeping = True 484 import signal 485 self.oldhandler = signal.signal(signal.SIGTERM, 486 self._SIGTERMHandler)
487 488 def _SIGTERMHandler(self, number, frame): 489 self.sleeping = False 490 491 def sleep(self): 492 while self.sleeping: 493 time.sleep(0.1) 494 495 waiter = Waiter() 496 waiter.sleep() 497
498 -def addressGetHost(a):
499 """ 500 Get the host name of an IPv4 address. 501 502 @type a: L{twisted.internet.address.IPv4Address} 503 """ 504 if not isinstance(a, address.IPv4Address) and not isinstance(a, 505 address.UNIXAddress): 506 raise TypeError("object %r is not an IPv4Address or UNIXAddress" % a) 507 if isinstance(a, address.UNIXAddress): 508 return 'localhost' 509 510 try: 511 host = a.host 512 except AttributeError: 513 host = a[1] 514 return host
515
516 -def addressGetPort(a):
517 """ 518 Get the port number of an IPv4 address. 519 520 @type a: L{twisted.internet.address.IPv4Address} 521 """ 522 assert(isinstance(a, address.IPv4Address)) 523 try: 524 port = a.port 525 except AttributeError: 526 port = a[2] 527 return port
528
529 -def componentPath(componentName, parentName):
530 """ 531 Create a path string out of the name of a component and its parent. 532 533 @deprecated: Use @componentId instead 534 """ 535 return '/%s/%s' % (parentName, componentName)
536
537 -def componentId(parentName, componentName):
538 """ 539 Create a C{componentId} based on the C{parentName} and C{componentName}. 540 541 A C{componentId} uniquely identifies a component within a planet. 542 543 @since: 0.3.1 544 545 @rtype: str 546 """ 547 return '/%s/%s' % (parentName, componentName)
548
549 -def parseComponentId(componentId):
550 """ 551 Parses a component id ("/flowName/componentName") into its parts. 552 553 @since: 0.3.1 554 555 @rtype: tuple of (str, str) 556 @return: tuple of (flowName, componentName) 557 """ 558 list = componentId.split("/") 559 assert len(list) == 3 560 assert list[0] == '' 561 return (list[1], list[2])
562
563 -def feedId(componentName, feedName):
564 """ 565 Create a C{feedId} based on the C{componentName} and C{feedName}. 566 567 A C{feedId} uniquely identifies a feed within a flow or atmosphere. 568 It identifies the feed from a feeder to an eater. 569 570 @since: 0.3.1 571 572 @rtype: str 573 """ 574 return "%s:%s" % (componentName, feedName)
575
576 -def parseFeedId(feedId):
577 """ 578 @since: 0.3.1 579 580 @rtype: tuple of (str, str) 581 @return: tuple of (componentName, feedName) 582 """ 583 assert not feedId.startswith('/'), \ 584 "feedId must not start with '/': %s" % feedId 585 list = feedId.split(":") 586 assert len(list) == 2, "feedId %s should contain exactly one ':'" % feedId 587 return (list[0], list[1])
588
589 -def fullFeedId(flowName, componentName, feedName):
590 """ 591 Create a C{fullFeedId} based on the C{flowName}, C{componentName} and 592 C{feedName}. 593 594 A C{fullFeedId} uniquely identifies a feed within a planet. 595 596 @since: 0.3.1 597 598 @rtype: str 599 """ 600 return feedId(componentId(flowName, componentName), feedName)
601
602 -def parseFullFeedId(fullFeedId):
603 """ 604 @since: 0.3.1 605 606 @rtype: tuple of (str, str, str) 607 @return: tuple of (flowName, componentName, feedName) 608 """ 609 list = fullFeedId.split(":") 610 assert len(list) == 2 611 flowName, componentName = parseComponentId(list[0]) 612 return (flowName, componentName, list[1])
613
614 -def objRepr(object):
615 """ 616 Return a string giving the fully qualified class of the given object. 617 """ 618 c = object.__class__ 619 return "%s.%s" % (c.__module__, c.__name__)
620
621 -def pathToModuleName(path):
622 """ 623 Convert the given (relative) path to the python module it would have to 624 be imported as. 625 626 Return None if the path is not a valid python module 627 """ 628 # __init__ is last because it works on top of the first three 629 valid = False 630 suffixes = ['.pyc', '.pyo', '.py', os.path.sep + '__init__'] 631 for s in suffixes: 632 if path.endswith(s): 633 path = path[:-len(s)] 634 valid = True 635 636 # if the path still contains dots, it can't possibly be a valid module 637 if not '.' in path: 638 valid = True 639 640 if not valid: 641 return None 642 643 return ".".join(path.split(os.path.sep))
644
645 -def getLL():
646 """ 647 Return the (at most) two-letter language code set for message translation. 648 """ 649 # LANGUAGE is a GNU extension; it can be colon-seperated but we ignore the 650 # advanced stuff. If that's not present, just use LANG, as normal. 651 language = os.environ.get('LANGUAGE', None) 652 if language != None: 653 LL = language[:2] 654 else: 655 lang = os.environ.get('LANG', 'en') 656 LL = lang[:2] 657 658 return LL
659
660 -def gettexter(domain):
661 """ 662 Returns a method you can use as _ to translate strings for the given 663 domain. 664 """ 665 import gettext 666 return lambda s: gettext.dgettext(domain, s)
667
668 -def compareVersions(first, second):
669 """ 670 Compares two version strings. Returns -1, 0 or 1 if first is smaller than, 671 equal to or larger than second. 672 673 @type first: str 674 @type second: str 675 676 @rtype: int 677 """ 678 if first == second: 679 return 0 680 681 firsts = first.split(".") 682 seconds = second.split(".") 683 684 while firsts or seconds: 685 f = 0 686 s = 0 687 try: 688 f = int(firsts[0]) 689 del firsts[0] 690 except IndexError: 691 pass 692 try: 693 s = int(seconds[0]) 694 del seconds[0] 695 except IndexError: 696 pass 697 698 if f < s: 699 return -1 700 if f > s: 701 return 1 702 703 return 0
704
705 -def checkVersionsCompat(version, against):
706 """Checks if two versions are compatible. 707 708 Versions are compatible if they are from the same minor release. In 709 addition, unstable (odd) releases are treated as compatible with 710 their subsequent stable (even) releases. 711 712 @param version: version to check 713 @type version: tuple of int 714 @param against: version against which we are checking. For versions 715 of core Flumotion, this may be obtained by 716 L{flumotion.configure.configure.version}. 717 @type against: tuple of int 718 @returns: True if a configuration from version is compatible with 719 against. 720 """ 721 if version == against: 722 return True 723 elif version > against: 724 # e.g. config generated against newer flumotion than what is 725 # running 726 return False 727 elif len(version) < 2 or len(against) < 2: 728 return False 729 elif version[0] != against[0]: 730 return False 731 else: 732 round2 = lambda x: ((x + 1) // 2) * 2 733 return round2(version[1]) == round2(against[1])
734
735 -def versionTupleToString(versionTuple):
736 """ 737 Converts a version tuple to a string. If the tuple has a zero nano number, 738 it is dropped from the string. 739 740 @since: 0.4.1 741 742 @type versionTuple: tuple 743 744 @rtype: str 745 """ 746 if len(versionTuple) == 4 and versionTuple[3] == 0: 747 versionTuple = versionTuple[:3] 748 749 return ".".join([str(i) for i in versionTuple])
750
751 -def _uniq(l, key=lambda x: x):
752 """ 753 Filters out duplicate entries in a list. 754 """ 755 out = [] 756 for x in l: 757 if key(x) not in [key(y) for y in out]: 758 out.append(x) 759 return out
760
761 -def get_all_methods(obj, method, subclass_first):
762 mro = type(obj).__mro__ 763 if not subclass_first: 764 # do a list() so as to copy the mro, we reverse the list in 765 # place so as to start with the base class 766 mro = list(mro) 767 mro.reverse() 768 procs = [] 769 for c in mro: 770 if hasattr(c, method): 771 proc = getattr(c, method) 772 assert callable(proc) and hasattr(proc, 'im_func'),\ 773 'attr %s of class %s is not a method' % (method, c) 774 procs.append(proc) 775 776 # In a hierarchy A -> B, if A implements the method, B will inherit 777 # it as well. Compare the functions implementing the methods so as 778 # to avoid calling them twice. 779 return _uniq(procs, lambda proc: proc.im_func)
780
781 -def call_each_method(obj, method, *args, **kwargs):
782 """ 783 Invoke all implementations of a method on an object. 784 785 Searches for method implementations in the object's class and all of 786 the class' superclasses. Calls the methods in method resolution 787 order, which goes from subclasses to superclasses. 788 """ 789 for proc in get_all_methods(obj, method, True): 790 proc(obj, *args, **kwargs)
791
792 -def call_each_method_reversed(obj, method, *args, **kwargs):
793 """ 794 Invoke all implementations of a method on an object. 795 796 Like call_each_method, but calls the methods in reverse method 797 resolution order, from superclasses to subclasses. 798 """ 799 for proc in get_all_methods(obj, method, False): 800 proc(obj, *args, **kwargs)
801
802 -class InitMixin(object):
803 """ 804 A mixin class to help with object initialization. 805 806 In some class hierarchies, __init__ is only used for initializing 807 instance variables. In these cases it is advantageous to avoid the 808 need to "chain up" to a parent implementation of a method. Adding 809 this class to your hierarchy will, for each class in the object's 810 class hierarchy, call the class's init() implementation on the 811 object. 812 813 Note that the function is called init() without underscrores, and 814 that there is no need to chain up to superclasses' implementations. 815 816 Uses call_each_method_reversed() internally. 817 """ 818
819 - def __init__(self, *args, **kwargs):
820 call_each_method_reversed(self, 'init', *args, **kwargs)
821
822 -def strToBool(string):
823 """ 824 @type string: str 825 826 @return: True if the string represents a value we interpret as true. 827 """ 828 if string in ('True', 'true', '1', 'yes'): 829 return True 830 831 return False
832
833 -def assertSSLAvailable():
834 """Assert that twisted has support for SSL connections. 835 """ 836 from twisted.internet import posixbase 837 from flumotion.common import errors 838 839 if not posixbase.sslEnabled: 840 raise errors.NoSSLError()
841
842 -class Poller(object, log.Loggable):
843 """ 844 A class representing a cancellable, periodic call to a procedure, 845 which is robust in the face of exceptions raised by the procedure. 846 847 The poller will wait for a specified number of seconds between 848 calls. The time taken for the procedure to complete is not counted 849 in the timeout. If the procedure returns a deferred, rescheduling 850 will be performed after the deferred fires. 851 852 For example, if the timeout is 10 seconds and the procedure returns 853 a deferred which fires 5 seconds later, the next invocation of the 854 procedure will be performed 15 seconds after the previous 855 invocation. 856 """ 857
858 - def __init__(self, proc, timeout, immediately=False, start=True):
859 """ 860 @param proc: a procedure of no arguments 861 @param timeout: float number of seconds to wait between calls 862 @param immediately: whether to immediately call proc, or to wait 863 until one period has passed 864 @param immediately: whether to start the poller (defaults to 865 True) 866 """ 867 from twisted.internet import reactor 868 from twisted.internet import defer 869 870 self._callLater = reactor.callLater 871 self._maybeDeferred = defer.maybeDeferred 872 873 self.proc = proc 874 self.logName = 'poller-%s' % proc.__name__ 875 self.timeout = timeout 876 877 self._dc = None 878 self.running = False 879 880 if start: 881 self.start(immediately)
882
883 - def start(self, immediately=False):
884 """Start the poller. 885 886 This procedure is called during __init__, so it is normally not 887 necessary to call it. It will ensure that the poller is running, 888 even after a previous call to stop(). 889 890 @param immediately: whether to immediately invoke the poller, or 891 to wait until one period has passed 892 """ 893 if self.running: 894 self.debug('already running') 895 else: 896 self.running = True 897 self._reschedule(immediately)
898
899 - def _reschedule(self, immediately=False):
900 assert self._dc is None 901 if self.running: 902 if immediately: 903 self.run() 904 else: 905 self._dc = self._callLater(self.timeout, self.run) 906 else: 907 self.debug('shutting down, not rescheduling')
908
909 - def run(self):
910 """Run the poller immediately, regardless of when it was last 911 run. 912 """ 913 def reschedule(v): 914 self._reschedule() 915 return v
916 917 if self._dc and self._dc.active(): 918 # we don't get here in the normal periodic case, only for 919 # explicit run() invocations 920 self._dc.cancel() 921 self._dc = None 922 923 d = self._maybeDeferred(self.proc) 924 d.addBoth(reschedule)
925
926 - def stop(self):
927 """Stop the poller. 928 929 This procedure ensures that the poller is stopped. It may be 930 called multiple times. 931 """ 932 if self._dc: 933 self._dc.cancel() 934 self._dc = None 935 self.running = False
936
937 -def strftime(format, t):
938 """A version of time.strftime that can handle unicode formats.""" 939 out = [] 940 percent = False 941 for c in format: 942 if percent: 943 out.append(time.strftime('%'+c, t)) 944 percent = False 945 elif c == '%': 946 percent = True 947 else: 948 out.append(c) 949 if percent: 950 out.append('%') 951 return ''.join(out)
952