Package flumotion :: Package component :: Package base :: Module scheduler
[hide private]

Source Code for Module flumotion.component.base.scheduler

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22   
 23  import time 
 24  from datetime import datetime, timedelta, tzinfo 
 25   
 26  from twisted.internet import reactor 
 27   
 28  from flumotion.common import log, avltree 
 29  from flumotion.component.base import watcher 
 30   
 31   
 32  # A class capturing the platform's idea of local time, from the 
 33  # documentation of datetime.tzinfo. 
34 -class LocalTimezone(tzinfo):
35 STDOFFSET = timedelta(seconds=-time.timezone) 36 if time.daylight: 37 DSTOFFSET = timedelta(seconds=-time.altzone) 38 else: 39 DSTOFFSET = STDOFFSET 40 DSTDIFF = DSTOFFSET - STDOFFSET 41 ZERO = timedelta(0) 42
43 - def utcoffset(self, dt):
44 if self._isdst(dt): 45 return self.DSTOFFSET 46 else: 47 return self.STDOFFSET
48
49 - def dst(self, dt):
50 if self._isdst(dt): 51 return self.DSTDIFF 52 else: 53 return self.ZERO
54
55 - def tzname(self, dt):
56 return time.tzname[self._isdst(dt)]
57
58 - def _isdst(self, dt):
59 tt = (dt.year, dt.month, dt.day, 60 dt.hour, dt.minute, dt.second, 61 dt.weekday(), 0, -1) 62 return time.localtime(time.mktime(tt)).tm_isdst > 0
63 LOCAL = LocalTimezone() 64 65
66 -def now(tz=LOCAL):
67 return datetime.now(tz)
68 69
70 -class Event(log.Loggable):
71 """ 72 I am an event. I have a start and stop time and a "content" that can 73 be anything. I can recur. 74 """ 75
76 - def __init__(self, start, end, content, recur=None, now=None):
77 self.debug('new event, content=%r, start=%r, end=%r', content, 78 start, end) 79 80 assert start < end 81 82 if recur: 83 from dateutil import rrule 84 if now is None: 85 now = datetime.now(LOCAL) 86 if end.tzinfo is None: 87 end = datetime(end.year, end.month, end.day, end.hour, 88 end.minute, end.second, end.microsecond, LOCAL) 89 if start.tzinfo is None: 90 start = datetime(start.year, start.month, start.day, 91 start.hour, start.minute, start.second, 92 start.microsecond, LOCAL) 93 94 if isinstance(recur, timedelta): 95 interval = recur.days*24*60*60 + recur.seconds 96 endRecurRule = rrule.rrule(rrule.SECONDLY, 97 interval=interval, 98 dtstart=end) 99 startRecurRule = rrule.rrule(rrule.SECONDLY, 100 interval=interval, 101 dtstart=start) 102 else: 103 endRecurRule = rrule.rrulestr(recur, dtstart=end) 104 startRecurRule = rrule.rrulestr(recur, dtstart=start) 105 106 if end < now: 107 end = endRecurRule.after(now) 108 start = startRecurRule.before(end) 109 self.debug("adjusting start and end times to %r, %r", 110 start, end) 111 112 if not start.tzinfo: 113 self.info('event starting at %r does not have timezone ' 114 'info; using local time zone', start) 115 start = start.replace(tzinfo=LOCAL) 116 if not end.tzinfo: 117 self.info('event ending at %r does not have timezone ' 118 'info; using local time zone', end) 119 end = end.replace(tzinfo=LOCAL) 120 121 self.start = start 122 self.end = end 123 self.content = content 124 self.recur = recur
125
126 - def reschedule(self, now=None):
127 if self.recur: 128 return Event(self.start, self.end, self.content, self.recur, 129 now) 130 else: 131 return None
132
133 - def toTuple(self):
134 return self.start, self.end, self.content, self.recur
135
136 - def __repr__(self):
137 return '<Event %r>' % (self.toTuple(),)
138
139 - def __lt__(self, other):
140 return self.toTuple() < other.toTuple()
141
142 - def __gt__(self, other):
143 return self.toTuple() > other.toTuple()
144
145 - def __eq__(self, other):
146 return self.toTuple() == other.toTuple()
147 148
149 -class EventStore(avltree.AVLTree, log.Loggable):
150 - def __init__(self, events):
151 avltree.AVLTree.__init__(self) 152 for event in events: 153 self.insert(event)
154
155 - def insert(self, event):
156 try: 157 avltree.AVLTree.insert(self, event) 158 return True 159 except ValueError: 160 self.warning('an identical event to %r already exists in ' 161 'store', event) 162 return False
163 164
165 -class Scheduler(log.Loggable):
166 """ 167 I keep track of upcoming events. 168 169 I can provide notifications when events stop and start, and maintain 170 a set of current events. 171 """ 172
173 - def __init__(self):
174 self.current = [] 175 self._delayedCall = None 176 self._subscribeId = 0 177 self.subscribers = {} 178 self.replaceEvents([])
179
180 - def addEvent(self, start, end, content, recur=None, now=None):
181 """Add a new event to the scheduler. 182 183 @param start: wall-clock time of event start 184 @type start: datetime 185 @param end: wall-clock time of event end 186 @type end: datetime 187 @param content: content of this event 188 @type content: str 189 @param recur: recurrence rule, either as a string parseable by 190 datetime.rrule.rrulestr or as a datetime.timedelta 191 @type recur: None, str, or datetime.timedelta 192 193 @returns: an Event that can later be passed to removeEvent, if 194 so desired. The event will be removed or rescheduled 195 automatically when it stops. 196 """ 197 if now is None: 198 now = datetime.now(LOCAL) 199 event = Event(start, end, content, recur, now) 200 if event.end < now: 201 self.warning('attempted to schedule event in the past: %r', 202 event) 203 else: 204 if self.events.insert(event): 205 if event.start < now: 206 self._eventStarted(event) 207 self._reschedule() 208 return event
209
210 - def removeEvent(self, event):
211 """Remove an event from the scheduler. 212 213 @param event: an event, as returned from addEvent() 214 @type event: Event 215 """ 216 currentEvent = event.reschedule() or event 217 self.events.delete(currentEvent) 218 if currentEvent in self.current: 219 self._eventStopped(currentEvent) 220 self._reschedule()
221
222 - def getCurrentEvents(self):
223 return [e.content for e in self.current]
224
225 - def addEvents(self, events):
226 """ 227 Add a new list of events to the schedule. 228 229 @param events: the new events 230 @type events: a new set of events 231 """ 232 now = datetime.now() 233 for event in events: 234 if event.end > now: 235 if self.events.insert(event): 236 if event.start < now: 237 self._eventStarted(event) 238 if events: 239 self._reschedule()
240
241 - def replaceEvents(self, events):
242 """Replace the set of events in the scheduler. 243 244 This function is different than simply removing all events then 245 adding new ones, because it tries to avoid spurious 246 stopped/start notifications. 247 248 @param events: the new events 249 @type events: a sequence of Event 250 """ 251 now = datetime.now(LOCAL) 252 self.events = EventStore(events) 253 current = [] 254 for event in self.events: 255 if now < event.start: 256 break 257 elif event.end < now: 258 # yay functional trees: we don't modify the iterator 259 self.events.delete(event) 260 else: 261 current.append(event) 262 for event in self.current[:]: 263 if event not in current: 264 self._eventStopped(event) 265 for event in current: 266 if event not in self.current: 267 self._eventStarted(event) 268 assert self.current == current 269 self._reschedule()
270
271 - def subscribe(self, eventStarted, eventStopped):
272 """Subscribe to event happenings in the scheduler. 273 274 @param eventStarted: Function that will be called when an event 275 starts. 276 @type eventStarted: Event -> None 277 @param eventStopped: Function that will be called when an event 278 stops. 279 @type eventStopped: Event -> None 280 281 @returns: A subscription ID that can later be passed to 282 unsubscribe(). 283 """ 284 sid = self._subscribeId 285 self._subscribeId += 1 286 self.subscribers[sid] = (eventStarted, eventStopped) 287 return sid
288
289 - def unsubscribe(self, id):
290 """Unsubscribe from event happenings in the scheduler. 291 292 @param id: Subscription ID received from subscribe() 293 """ 294 del self.subscribers[id]
295
296 - def _eventStarted(self, event):
297 self.current.append(event) 298 for started, _ in self.subscribers.values(): 299 started(event)
300
301 - def _eventStopped(self, event):
302 self.current.remove(event) 303 for _, stopped in self.subscribers.values(): 304 stopped(event)
305
306 - def _reschedule(self):
307 def _getNextStart(): 308 for event in self.events: 309 if event not in self.current: 310 return event 311 return None
312 313 def _getNextStop(): 314 t = None 315 e = None 316 for event in self.current: 317 if not t or event.end < t: 318 t = event.end 319 e = event 320 return e
321 322 def doStart(e): 323 self._eventStarted(e) 324 self._reschedule() 325 326 def doStop(e): 327 self._eventStopped(e) 328 self.events.delete(e) 329 new = e.reschedule() 330 if new: 331 self.events.insert(new) 332 self._reschedule() 333 334 if self._delayedCall: 335 if self._delayedCall.active(): 336 self._delayedCall.cancel() 337 self._delayedCall = None 338 339 start = _getNextStart() 340 stop = _getNextStop() 341 now = datetime.now(LOCAL) 342 343 def toSeconds(td): 344 return max(td.days*24*3600 + td.seconds + td.microseconds/1e6, 0) 345 346 if start and (not stop or start.start < stop.end): 347 dc = reactor.callLater(toSeconds(start.start - now), 348 doStart, start) 349 elif stop: 350 dc = reactor.callLater(toSeconds(stop.end - now), 351 doStop, stop) 352 else: 353 dc = None 354 355 self._delayedCall = dc 356 357
358 -class ICalScheduler(Scheduler):
359 """ 360 I am a scheduler that takes its data from an ical file. 361 """ 362
363 - def __init__(self, fileObj):
364 from icalendar import Calendar 365 366 Scheduler.__init__(self) 367 368 def parseCalendarFromFile(f): 369 cal = Calendar.from_string(f.read()) 370 events = self.parseCalendar(cal) 371 self.replaceEvents(events)
372 parseCalendarFromFile(fileObj) 373 374 if hasattr(fileObj, 'name'): 375 def fileChanged(f): 376 parseCalendarFromFile(open(f,'r'))
377 self.watcher = watcher.FilesWatcher([fileObj.name]) 378 self.watcher.subscribe(fileChanged=fileChanged) 379 self.watcher.start() 380
381 - def parseCalendar(self, cal):
382 """ 383 Take a Calendar object and return a list of 384 Event objects. 385 386 @param cal: The calendar to "parse" 387 @type cal: icalendar.Calendar 388 @rtype List of {flumotion.component.base.scheduler.Event} 389 """ 390 events = [] 391 for event in cal.walk('vevent'): 392 try: 393 start = event.decoded('dtstart', None) 394 end = event.decoded('dtend', None) 395 summary = event.decoded('summary', None) 396 recur = event.get('rrule', None) 397 if start and end: 398 self.debug("start %r tzname %s end %r recur %r", start, 399 start.tzname(), end, recur) 400 if recur: 401 e = Event(start, end, summary, recur.ical()) 402 else: 403 e = Event(start, end, summary) 404 events.append(e) 405 else: 406 self.warning('ical has event without start or end: ' 407 '%r', event) 408 except Exception: 409 self.warning("could not parse ical event %r", event) 410 return events
411