util/observer.py: add EventStreamObserver

This commit is contained in:
Brian Warner 2010-08-04 00:26:12 -07:00
parent cbcb728e7e
commit 88d7ec2d54
1 changed files with 45 additions and 0 deletions

View File

@ -1,5 +1,6 @@
# -*- test-case-name: allmydata.test.test_observer -*-
import weakref
from twisted.internet import defer
from foolscap.api import eventually
@ -91,3 +92,47 @@ class ObserverList:
def notify(self, *args, **kwargs):
for o in self._watchers:
eventually(o, *args, **kwargs)
class EventStreamObserver:
"""A simple class to distribute multiple events to a single subscriber.
It accepts arbitrary kwargs, but no posargs."""
def __init__(self):
self._watcher = None
self._undelivered_results = []
self._canceler = None
def set_canceler(self, c, methname):
"""I will call c.METHNAME(self) when somebody cancels me."""
# we use a weakref to avoid creating a cycle between us and the thing
# we're observing: they'll be holding a reference to us to compare
# against the value we pass to their canceler function. However,
# since bound methods are first-class objects (and not kept alive by
# the object they're bound to), we can't just stash a weakref to the
# bound cancel method. Instead, we must hold a weakref to the actual
# object, and obtain its cancel method later.
# http://code.activestate.com/recipes/81253-weakmethod/ has an
# alternative.
self._canceler = (weakref.ref(c), methname)
def subscribe(self, observer, **watcher_kwargs):
self._watcher = (observer, watcher_kwargs)
while self._undelivered_results:
self._notify(self._undelivered_results.pop(0))
def notify(self, **result_kwargs):
if self._watcher:
self._notify(result_kwargs)
else:
self._undelivered_results.append(result_kwargs)
def _notify(self, result_kwargs):
o, watcher_kwargs = self._watcher
kwargs = dict(result_kwargs)
kwargs.update(watcher_kwargs)
eventually(o, **kwargs)
def cancel(self):
wr,methname = self._canceler
o = wr()
if o:
getattr(o,methname)(self)