diff --git a/src/allmydata/test/test_observer.py b/src/allmydata/test/test_observer.py index 0db13db58..73eece98e 100644 --- a/src/allmydata/test/test_observer.py +++ b/src/allmydata/test/test_observer.py @@ -101,3 +101,43 @@ class Observer(unittest.TestCase): d.addCallback(_step2) d.addCallback(_check2) return d + + def test_observer_list_reentrant(self): + """ + ``ObserverList`` is reentrant. + """ + observed = [] + + def observer_one(): + obs.unsubscribe(observer_one) + + def observer_two(): + observed.append(None) + + obs = observer.ObserverList() + obs.subscribe(observer_one) + obs.subscribe(observer_two) + obs.notify() + + self.assertEqual([None], observed) + + def test_observer_list_observer_errors(self): + """ + An error in an earlier observer does not prevent notification from being + delivered to a later observer. + """ + observed = [] + + def observer_one(): + raise Exception("Some problem here") + + def observer_two(): + observed.append(None) + + obs = observer.ObserverList() + obs.subscribe(observer_one) + obs.subscribe(observer_two) + obs.notify() + + self.assertEqual([None], observed) + self.assertEqual(1, len(self.flushLoggedErrors(Exception))) diff --git a/src/allmydata/util/observer.py b/src/allmydata/util/observer.py index 432aabb87..ad55e65a5 100644 --- a/src/allmydata/util/observer.py +++ b/src/allmydata/util/observer.py @@ -16,6 +16,9 @@ if PY2: import weakref from twisted.internet import defer from foolscap.api import eventually +from twisted.logger import ( + Logger, +) """The idiom we use is for the observed object to offer a method named 'when_something', which returns a deferred. That deferred will be fired when @@ -97,7 +100,10 @@ class LazyOneShotObserverList(OneShotObserverList): self._fire(self._get_result()) class ObserverList(object): - """A simple class to distribute events to a number of subscribers.""" + """ + Immediately distribute events to a number of subscribers. + """ + _logger = Logger() def __init__(self): self._watchers = [] @@ -109,8 +115,11 @@ class ObserverList(object): self._watchers.remove(observer) def notify(self, *args, **kwargs): - for o in self._watchers: - eventually(o, *args, **kwargs) + for o in self._watchers[:]: + try: + o(*args, **kwargs) + except: + self._logger.failure("While notifying {o!r}", o=o) class EventStreamObserver(object): """A simple class to distribute multiple events to a single subscriber.