1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 small common functions used by all processes
24 """
25
26 import errno
27 import os
28 import sys
29 import time
30 import signal
31 import locale
32
33 from twisted.internet import address
34
35 from flumotion.common import log
36
37
38
39
40 from flumotion.configure import configure
41
75
125
137
139 """
140 Print a version block for the flumotion binaries.
141
142 @arg binary: name of the binary
143 @type binary: string
144 """
145
146 block = []
147 block.append("%s %s" % (binary, configure.version))
148 block.append("part of Flumotion - a streaming media server")
149 block.append("(C) Copyright 2004,2005,2006,2007 Fluendo")
150 return "\n".join(block)
151
153 """
154 Merge the __implements__ tuples of the given classes into one tuple.
155 """
156 allYourBase = ()
157 for clazz in classes:
158 try:
159 interfaces = [i for i in clazz.__implemented__]
160 except AttributeError:
161
162
163
164 interfaces = []
165 for interface in interfaces:
166 allYourBase += (interface,)
167 return allYourBase
168
169 -def daemonize(stdin='/dev/null', stdout='/dev/null', stderr='/dev/null',
170 directory='/'):
171 '''
172 This forks the current process into a daemon.
173 The stdin, stdout, and stderr arguments are file names that
174 will be opened and be used to replace the standard file descriptors
175 in sys.stdin, sys.stdout, and sys.stderr.
176 These arguments are optional and default to /dev/null.
177
178 The fork will switch to the given directory.
179 '''
180
181 si = open(stdin, 'r')
182 os.dup2(si.fileno(), sys.stdin.fileno())
183 try:
184 log.outputToFiles(stdout, stderr)
185 except IOError, e:
186 if e.errno == errno.EACCES:
187 log.error('common', 'Permission denied writing to log file %s.',
188 e.filename)
189
190
191 try:
192 pid = os.fork()
193 if pid > 0:
194 sys.exit(0)
195 except OSError, e:
196 sys.stderr.write("Failed to fork: (%d) %s\n" % (e.errno, e.strerror))
197 sys.exit(1)
198
199
200 try:
201 os.chdir(directory)
202 except OSError, e:
203 from flumotion.common import errors
204 raise errors.SystemError, "Failed to change directory to %s: %s" % (
205 directory, e.strerror)
206 os.umask(0)
207 os.setsid()
208
209
210 try:
211 pid = os.fork()
212 if pid > 0:
213 sys.exit(0)
214 except OSError, e:
215 sys.stderr.write("Failed to fork: (%d) %s\n" % (e.errno, e.strerror))
216 sys.exit(1)
217
218
219
220
221
222 -def startup(processType, processName, daemonize=False, daemonizeTo='/'):
223 """
224 Prepare a process for starting, logging appropriate standarised messages.
225 First daemonizes the process, if daemonize is true.
226 """
227 log.info(processType, "Starting %s '%s'", processType, processName)
228
229 if daemonize:
230 daemonizeHelper(processType, daemonizeTo, processName)
231
232 log.info(processType, "Started %s '%s'", processType, processName)
233
234 def shutdownStarted():
235 log.info(processType, "Stopping %s '%s'", processType, processName)
236 def shutdownEnded():
237 log.info(processType, "Stopped %s '%s'", processType, processName)
238
239
240 from twisted.internet import reactor
241 reactor.addSystemEventTrigger('before', 'shutdown',
242 shutdownStarted)
243 reactor.addSystemEventTrigger('after', 'shutdown',
244 shutdownEnded)
245
247 """
248 Daemonize a process, writing log files and PID files to conventional
249 locations.
250
251 @param processType: The process type, for example 'worker'. Used
252 as part of the log file and PID file names.
253 @type processType: str
254 @param daemonizeTo: The directory that the daemon should run in.
255 @type daemonizeTo: str
256 @param processName: The service name of the process. Used to
257 disambiguate different instances of the same daemon.
258 @type processName: str
259 """
260
261 ensureDir(configure.logdir, "log file")
262 ensureDir(configure.rundir, "run file")
263
264 pid = getPid(processType, processName)
265 if pid:
266 raise SystemError(
267 "A %s service named '%s' is already running with pid %d"
268 % (processType, processName or processType, pid))
269
270 log.debug(processType, "%s service named '%s' daemonizing",
271 processType, processName)
272
273 if processName:
274 logPath = os.path.join(configure.logdir,
275 '%s.%s.log' % (processType, processName))
276 else:
277 logPath = os.path.join(configure.logdir,
278 '%s.log' % (processType,))
279 log.debug(processType, 'Further logging will be done to %s', logPath)
280
281 file = _acquirePidFile(processType, processName)
282
283
284 daemonize(stdout=logPath, stderr=logPath, directory=daemonizeTo)
285
286 log.debug(processType, 'Started daemon')
287
288
289 path = writePidFile(processType, processName, file=file)
290 log.debug(processType, 'written pid file %s', path)
291
292
293 from twisted.internet import reactor
294 def _deletePidFile():
295 log.debug(processType, 'deleting pid file')
296 deletePidFile(processType, processName)
297 reactor.addSystemEventTrigger('after', 'shutdown',
298 _deletePidFile)
299
300
301 -def argRepr(args=(), kwargs={}, max=-1):
302 """
303 Return a string representing the given args.
304 """
305
306
307
308
309 assert (type(args) is tuple or
310 type(args) is list)
311 assert type(kwargs) is dict
312
313 s = ''
314 args = list(args)
315
316 if args:
317 args = map(repr, args)
318 s += ', '.join(args)
319
320 if kwargs:
321 r = [(key + '=' + repr(item))
322 for key, item in kwargs.items()]
323
324 if s:
325 s += ', '
326 s += ', '.join(r)
327
328 return s
329
331 """
332 Ensure the given directory exists, creating it if not.
333 Raises a SystemError if this fails, including the given description.
334 """
335 if not os.path.exists(dir):
336 try:
337 os.makedirs(dir)
338 except:
339 from flumotion.common import errors
340 raise errors.SystemError, "could not create %s directory %s" % (
341 description, dir)
342
353
374
376 """
377 Open a PID file for writing, using the given process type and
378 process name for the filename. The returned file can be then passed
379 to writePidFile after forking.
380
381 @rtype: str
382 @returns: file object, open for writing
383 """
384 ensureDir(configure.rundir, "rundir")
385 path = getPidPath(type, name)
386 return open(path, 'w')
387
389 """
390 Delete the pid file in the run directory, using the given process type
391 and process name for the filename.
392
393 @rtype: str
394 @returns: full path to the pid file that was written
395 """
396 path = getPidPath(type, name)
397 os.unlink(path)
398 return path
399
401 """
402 Get the pid from the pid file in the run directory, using the given
403 process type and process name for the filename.
404
405 @returns: pid of the process, or None if not running or file not found.
406 """
407
408 pidPath = getPidPath(type, name)
409 log.log('common', 'pidfile for %s %s is %s' % (type, name, pidPath))
410 if not os.path.exists(pidPath):
411 return
412
413 file = open(pidPath, 'r')
414 pid = file.readline()
415 file.close()
416 if not pid or int(pid) == 0:
417 return
418
419 return int(pid)
420
422 """
423 Send the given process a signal.
424
425 @returns: whether or not the process with the given pid was running
426 """
427 try:
428 os.kill(pid, signum)
429 return True
430 except OSError, e:
431 if not e.errno == errno.ESRCH:
432
433 raise
434 return False
435
437 """
438 Send the given process a TERM signal.
439
440 @returns: whether or not the process with the given pid was running
441 """
442 return signalPid(pid, signal.SIGTERM)
443
445 """
446 Send the given process a KILL signal.
447
448 @returns: whether or not the process with the given pid was running
449 """
450 return signalPid(pid, signal.SIGKILL)
451
453 """
454 Check if the given pid is currently running.
455
456 @returns: whether or not a process with that pid is active.
457 """
458 return signalPid(pid, 0)
459
461 """
462 Wait for the given process type and name to have started and created
463 a pid file.
464
465 Return the pid.
466 """
467
468 pid = getPid(type, name)
469
470 while not pid:
471 time.sleep(0.1)
472 pid = getPid(type, name)
473
474 return pid
475
477 """
478 Wait until we get killed by a TERM signal (from someone else).
479 """
480
481 class Waiter:
482 def __init__(self):
483 self.sleeping = True
484 import signal
485 self.oldhandler = signal.signal(signal.SIGTERM,
486 self._SIGTERMHandler)
487
488 def _SIGTERMHandler(self, number, frame):
489 self.sleeping = False
490
491 def sleep(self):
492 while self.sleeping:
493 time.sleep(0.1)
494
495 waiter = Waiter()
496 waiter.sleep()
497
499 """
500 Get the host name of an IPv4 address.
501
502 @type a: L{twisted.internet.address.IPv4Address}
503 """
504 if not isinstance(a, address.IPv4Address) and not isinstance(a,
505 address.UNIXAddress):
506 raise TypeError("object %r is not an IPv4Address or UNIXAddress" % a)
507 if isinstance(a, address.UNIXAddress):
508 return 'localhost'
509
510 try:
511 host = a.host
512 except AttributeError:
513 host = a[1]
514 return host
515
517 """
518 Get the port number of an IPv4 address.
519
520 @type a: L{twisted.internet.address.IPv4Address}
521 """
522 assert(isinstance(a, address.IPv4Address))
523 try:
524 port = a.port
525 except AttributeError:
526 port = a[2]
527 return port
528
530 """
531 Create a path string out of the name of a component and its parent.
532
533 @deprecated: Use @componentId instead
534 """
535 return '/%s/%s' % (parentName, componentName)
536
538 """
539 Create a C{componentId} based on the C{parentName} and C{componentName}.
540
541 A C{componentId} uniquely identifies a component within a planet.
542
543 @since: 0.3.1
544
545 @rtype: str
546 """
547 return '/%s/%s' % (parentName, componentName)
548
550 """
551 Parses a component id ("/flowName/componentName") into its parts.
552
553 @since: 0.3.1
554
555 @rtype: tuple of (str, str)
556 @return: tuple of (flowName, componentName)
557 """
558 list = componentId.split("/")
559 assert len(list) == 3
560 assert list[0] == ''
561 return (list[1], list[2])
562
563 -def feedId(componentName, feedName):
564 """
565 Create a C{feedId} based on the C{componentName} and C{feedName}.
566
567 A C{feedId} uniquely identifies a feed within a flow or atmosphere.
568 It identifies the feed from a feeder to an eater.
569
570 @since: 0.3.1
571
572 @rtype: str
573 """
574 return "%s:%s" % (componentName, feedName)
575
577 """
578 @since: 0.3.1
579
580 @rtype: tuple of (str, str)
581 @return: tuple of (componentName, feedName)
582 """
583 assert not feedId.startswith('/'), \
584 "feedId must not start with '/': %s" % feedId
585 list = feedId.split(":")
586 assert len(list) == 2, "feedId %s should contain exactly one ':'" % feedId
587 return (list[0], list[1])
588
589 -def fullFeedId(flowName, componentName, feedName):
590 """
591 Create a C{fullFeedId} based on the C{flowName}, C{componentName} and
592 C{feedName}.
593
594 A C{fullFeedId} uniquely identifies a feed within a planet.
595
596 @since: 0.3.1
597
598 @rtype: str
599 """
600 return feedId(componentId(flowName, componentName), feedName)
601
603 """
604 @since: 0.3.1
605
606 @rtype: tuple of (str, str, str)
607 @return: tuple of (flowName, componentName, feedName)
608 """
609 list = fullFeedId.split(":")
610 assert len(list) == 2
611 flowName, componentName = parseComponentId(list[0])
612 return (flowName, componentName, list[1])
613
615 """
616 Return a string giving the fully qualified class of the given object.
617 """
618 c = object.__class__
619 return "%s.%s" % (c.__module__, c.__name__)
620
622 """
623 Convert the given (relative) path to the python module it would have to
624 be imported as.
625
626 Return None if the path is not a valid python module
627 """
628
629 valid = False
630 suffixes = ['.pyc', '.pyo', '.py', os.path.sep + '__init__']
631 for s in suffixes:
632 if path.endswith(s):
633 path = path[:-len(s)]
634 valid = True
635
636
637 if not '.' in path:
638 valid = True
639
640 if not valid:
641 return None
642
643 return ".".join(path.split(os.path.sep))
644
646 """
647 Return the (at most) two-letter language code set for message translation.
648 """
649
650
651 language = os.environ.get('LANGUAGE', None)
652 if language != None:
653 LL = language[:2]
654 else:
655 lang = os.environ.get('LANG', 'en')
656 LL = lang[:2]
657
658 return LL
659
660 -def gettexter(domain):
661 """
662 Returns a method you can use as _ to translate strings for the given
663 domain.
664 """
665 import gettext
666 return lambda s: gettext.dgettext(domain, s)
667
669 """
670 Compares two version strings. Returns -1, 0 or 1 if first is smaller than,
671 equal to or larger than second.
672
673 @type first: str
674 @type second: str
675
676 @rtype: int
677 """
678 if first == second:
679 return 0
680
681 firsts = first.split(".")
682 seconds = second.split(".")
683
684 while firsts or seconds:
685 f = 0
686 s = 0
687 try:
688 f = int(firsts[0])
689 del firsts[0]
690 except IndexError:
691 pass
692 try:
693 s = int(seconds[0])
694 del seconds[0]
695 except IndexError:
696 pass
697
698 if f < s:
699 return -1
700 if f > s:
701 return 1
702
703 return 0
704
706 """Checks if two versions are compatible.
707
708 Versions are compatible if they are from the same minor release. In
709 addition, unstable (odd) releases are treated as compatible with
710 their subsequent stable (even) releases.
711
712 @param version: version to check
713 @type version: tuple of int
714 @param against: version against which we are checking. For versions
715 of core Flumotion, this may be obtained by
716 L{flumotion.configure.configure.version}.
717 @type against: tuple of int
718 @returns: True if a configuration from version is compatible with
719 against.
720 """
721 if version == against:
722 return True
723 elif version > against:
724
725
726 return False
727 elif len(version) < 2 or len(against) < 2:
728 return False
729 elif version[0] != against[0]:
730 return False
731 else:
732 round2 = lambda x: ((x + 1) // 2) * 2
733 return round2(version[1]) == round2(against[1])
734
736 """
737 Converts a version tuple to a string. If the tuple has a zero nano number,
738 it is dropped from the string.
739
740 @since: 0.4.1
741
742 @type versionTuple: tuple
743
744 @rtype: str
745 """
746 if len(versionTuple) == 4 and versionTuple[3] == 0:
747 versionTuple = versionTuple[:3]
748
749 return ".".join([str(i) for i in versionTuple])
750
751 -def _uniq(l, key=lambda x: x):
752 """
753 Filters out duplicate entries in a list.
754 """
755 out = []
756 for x in l:
757 if key(x) not in [key(y) for y in out]:
758 out.append(x)
759 return out
760
762 mro = type(obj).__mro__
763 if not subclass_first:
764
765
766 mro = list(mro)
767 mro.reverse()
768 procs = []
769 for c in mro:
770 if hasattr(c, method):
771 proc = getattr(c, method)
772 assert callable(proc) and hasattr(proc, 'im_func'),\
773 'attr %s of class %s is not a method' % (method, c)
774 procs.append(proc)
775
776
777
778
779 return _uniq(procs, lambda proc: proc.im_func)
780
782 """
783 Invoke all implementations of a method on an object.
784
785 Searches for method implementations in the object's class and all of
786 the class' superclasses. Calls the methods in method resolution
787 order, which goes from subclasses to superclasses.
788 """
789 for proc in get_all_methods(obj, method, True):
790 proc(obj, *args, **kwargs)
791
793 """
794 Invoke all implementations of a method on an object.
795
796 Like call_each_method, but calls the methods in reverse method
797 resolution order, from superclasses to subclasses.
798 """
799 for proc in get_all_methods(obj, method, False):
800 proc(obj, *args, **kwargs)
801
803 """
804 A mixin class to help with object initialization.
805
806 In some class hierarchies, __init__ is only used for initializing
807 instance variables. In these cases it is advantageous to avoid the
808 need to "chain up" to a parent implementation of a method. Adding
809 this class to your hierarchy will, for each class in the object's
810 class hierarchy, call the class's init() implementation on the
811 object.
812
813 Note that the function is called init() without underscrores, and
814 that there is no need to chain up to superclasses' implementations.
815
816 Uses call_each_method_reversed() internally.
817 """
818
821
823 """
824 @type string: str
825
826 @return: True if the string represents a value we interpret as true.
827 """
828 if string in ('True', 'true', '1', 'yes'):
829 return True
830
831 return False
832
834 """Assert that twisted has support for SSL connections.
835 """
836 from twisted.internet import posixbase
837 from flumotion.common import errors
838
839 if not posixbase.sslEnabled:
840 raise errors.NoSSLError()
841
842 -class Poller(object, log.Loggable):
843 """
844 A class representing a cancellable, periodic call to a procedure,
845 which is robust in the face of exceptions raised by the procedure.
846
847 The poller will wait for a specified number of seconds between
848 calls. The time taken for the procedure to complete is not counted
849 in the timeout. If the procedure returns a deferred, rescheduling
850 will be performed after the deferred fires.
851
852 For example, if the timeout is 10 seconds and the procedure returns
853 a deferred which fires 5 seconds later, the next invocation of the
854 procedure will be performed 15 seconds after the previous
855 invocation.
856 """
857
858 - def __init__(self, proc, timeout, immediately=False, start=True):
859 """
860 @param proc: a procedure of no arguments
861 @param timeout: float number of seconds to wait between calls
862 @param immediately: whether to immediately call proc, or to wait
863 until one period has passed
864 @param immediately: whether to start the poller (defaults to
865 True)
866 """
867 from twisted.internet import reactor
868 from twisted.internet import defer
869
870 self._callLater = reactor.callLater
871 self._maybeDeferred = defer.maybeDeferred
872
873 self.proc = proc
874 self.logName = 'poller-%s' % proc.__name__
875 self.timeout = timeout
876
877 self._dc = None
878 self.running = False
879
880 if start:
881 self.start(immediately)
882
883 - def start(self, immediately=False):
884 """Start the poller.
885
886 This procedure is called during __init__, so it is normally not
887 necessary to call it. It will ensure that the poller is running,
888 even after a previous call to stop().
889
890 @param immediately: whether to immediately invoke the poller, or
891 to wait until one period has passed
892 """
893 if self.running:
894 self.debug('already running')
895 else:
896 self.running = True
897 self._reschedule(immediately)
898
900 assert self._dc is None
901 if self.running:
902 if immediately:
903 self.run()
904 else:
905 self._dc = self._callLater(self.timeout, self.run)
906 else:
907 self.debug('shutting down, not rescheduling')
908
910 """Run the poller immediately, regardless of when it was last
911 run.
912 """
913 def reschedule(v):
914 self._reschedule()
915 return v
916
917 if self._dc and self._dc.active():
918
919
920 self._dc.cancel()
921 self._dc = None
922
923 d = self._maybeDeferred(self.proc)
924 d.addBoth(reschedule)
925
927 """Stop the poller.
928
929 This procedure ensures that the poller is stopped. It may be
930 called multiple times.
931 """
932 if self._dc:
933 self._dc.cancel()
934 self._dc = None
935 self.running = False
936
938 """A version of time.strftime that can handle unicode formats."""
939 out = []
940 percent = False
941 for c in format:
942 if percent:
943 out.append(time.strftime('%'+c, t))
944 percent = False
945 elif c == '%':
946 percent = True
947 else:
948 out.append(c)
949 if percent:
950 out.append('%')
951 return ''.join(out)
952