Make ObserverList synchronous, reentrant, and exception safe

with tests
This commit is contained in:
Jean-Paul Calderone 2020-12-16 16:19:33 -05:00
parent 3513e9b4fc
commit 60e401ca69
2 changed files with 52 additions and 3 deletions

View File

@ -101,3 +101,43 @@ class Observer(unittest.TestCase):
d.addCallback(_step2) d.addCallback(_step2)
d.addCallback(_check2) d.addCallback(_check2)
return d return d
def test_observer_list_reentrant(self):
"""
``ObserverList`` is reentrant.
"""
observed = []
def observer_one():
obs.unsubscribe(observer_one)
def observer_two():
observed.append(None)
obs = observer.ObserverList()
obs.subscribe(observer_one)
obs.subscribe(observer_two)
obs.notify()
self.assertEqual([None], observed)
def test_observer_list_observer_errors(self):
"""
An error in an earlier observer does not prevent notification from being
delivered to a later observer.
"""
observed = []
def observer_one():
raise Exception("Some problem here")
def observer_two():
observed.append(None)
obs = observer.ObserverList()
obs.subscribe(observer_one)
obs.subscribe(observer_two)
obs.notify()
self.assertEqual([None], observed)
self.assertEqual(1, len(self.flushLoggedErrors(Exception)))

View File

@ -16,6 +16,9 @@ if PY2:
import weakref import weakref
from twisted.internet import defer from twisted.internet import defer
from foolscap.api import eventually from foolscap.api import eventually
from twisted.logger import (
Logger,
)
"""The idiom we use is for the observed object to offer a method named """The idiom we use is for the observed object to offer a method named
'when_something', which returns a deferred. That deferred will be fired when 'when_something', which returns a deferred. That deferred will be fired when
@ -97,7 +100,10 @@ class LazyOneShotObserverList(OneShotObserverList):
self._fire(self._get_result()) self._fire(self._get_result())
class ObserverList(object): class ObserverList(object):
"""A simple class to distribute events to a number of subscribers.""" """
Immediately distribute events to a number of subscribers.
"""
_logger = Logger()
def __init__(self): def __init__(self):
self._watchers = [] self._watchers = []
@ -109,8 +115,11 @@ class ObserverList(object):
self._watchers.remove(observer) self._watchers.remove(observer)
def notify(self, *args, **kwargs): def notify(self, *args, **kwargs):
for o in self._watchers: for o in self._watchers[:]:
eventually(o, *args, **kwargs) try:
o(*args, **kwargs)
except:
self._logger.failure("While notifying {o!r}", o=o)
class EventStreamObserver(object): class EventStreamObserver(object):
"""A simple class to distribute multiple events to a single subscriber. """A simple class to distribute multiple events to a single subscriber.