Added wxPython.lib.evtmgr by Rob Schecter, which is an easier, more

"Pythonic" and more OO method of registering handlers for wxWindows
events using the Publish/Subscribe pattern.


git-svn-id: https://svn.wxwidgets.org/svn/wx/wxWidgets/branches/WX_2_4_BRANCH@18214 c3d73ce0-8a6f-49c7-b76d-6d57e0e08775
This commit is contained in:
Robin Dunn
2002-12-13 22:01:38 +00:00
parent 522b6f2994
commit 13b14ee5ad
3 changed files with 886 additions and 0 deletions

View File

@@ -54,6 +54,10 @@ Added Throbber from Cliff Wells to the library and the demo.
Windows installer prompts to uninstall old version first.
Added wxPython.lib.evtmgr by Rob Schecter, which is an easier, more
"Pythonic" and more OO method of registering handlers for wxWindows
events using the Publish/Subscribe pattern.

View File

@@ -0,0 +1,505 @@
#---------------------------------------------------------------------------
# Name: wxPython.lib.evtmgr
# Purpose: An easier, more "Pythonic" and more OO method of registering
# handlers for wxWindows events using the Publish/Subscribe
# pattern.
#
# Author: Rob Schecter and Robin Dunn
#
# Created: 12-December-2002
# RCS-ID: $Id$
# Copyright: (c) 2002 by Rob Schecter
# Licence: wxWindows license
#---------------------------------------------------------------------------
"""
A module that allows multiple handlers to respond to single wxWindows
events. This allows true NxN Observer/Observable connections: One
event can be received by multiple handlers, and one handler can
receive multiple events.
There are two ways to register event handlers. The first way is
similar to standard wxPython handler registration:
from wxPython.lib.evtmgr import eventManager
eventManager.Register(handleEvents, EVT_BUTTON, win=frame, id=101)
There's also a new object-oriented way to register for events. This
invocation is equivalent to the one above, but does not require the
programmer to declare or track control ids or parent containers:
eventManager.register(handleEvents, EVT_BUTTON, myButton)
This module is Python 2.1+ compatible.
Author: Robb Shecter
"""
import pubsub
#---------------------------------------------------------------------------
class EventManager:
"""
This is the main class in the module, and is the only class that
the application programmer needs to use. There is a pre-created
instance of this class called 'eventManager'. It should not be
necessary to create other instances.
"""
def __init__(self):
self.eventAdapterDict = {}
self.messageAdapterDict = {}
self.windowTopicLookup = {}
self.listenerTopicLookup = {}
self.__publisher = pubsub.Publisher()
self.EMPTY_LIST = []
def Register(self, listener, event, source=None, win=None, id=None):
"""
Registers a listener function (or any callable object) to
receive events of type event coming from the source window.
For example:
eventManager.Register(self.OnButton, EVT_BUTTON, theButton)
Alternatively, the specific window where the event is
delivered, and/or the ID of the event source can be specified.
For example:
eventManager.Register(self.OnButton, EVT_BUTTON, win=self, id=ID_BUTTON)
or
eventManager.Register(self.OnButton, EVT_BUTTON, theButton, self)
"""
# 1. Check if the 'event' is actually one of the multi-
# event macros.
if _macroInfo.isMultiEvent(event):
raise 'Cannot register the macro, '+`event`+'. Register instead the individual events.'
# Support a more OO API. This allows the GUI widget itself to
# be specified, and the id to be retrieved from the system,
# instead of kept track of explicitly by the programmer.
# (Being used to doing GUI work with Java, this seems to me to be
# the natural way of doing things.)
if source is not None:
id = source.GetId()
if win is None:
win = source
topic = (event, win, id)
# Create an adapter from the PS system back to wxEvents, and
# possibly one from wxEvents:
if not self.__haveMessageAdapter(listener, topic):
messageAdapter = MessageAdapter(eventHandler=listener, topicPattern=topic)
try:
self.messageAdapterDict[topic][listener] = messageAdapter
except KeyError:
self.messageAdapterDict[topic] = {}
self.messageAdapterDict[topic][listener] = messageAdapter
if not self.eventAdapterDict.has_key(topic):
self.eventAdapterDict[topic] = EventAdapter(event, win, id)
else:
# Throwing away a duplicate request
pass
# For time efficiency when deregistering by window:
try:
self.windowTopicLookup[win].append(topic)
except KeyError:
self.windowTopicLookup[win] = []
self.windowTopicLookup[win].append(topic)
# For time efficiency when deregistering by listener:
try:
self.listenerTopicLookup[listener].append(topic)
except KeyError:
self.listenerTopicLookup[listener] = []
self.listenerTopicLookup[listener].append(topic)
# See if the source understands the listeningFor protocol.
# This is a bit of a test I'm working on - it allows classes
# to know when their events are being listened to. I use
# it to enable chaining events from contained windows only
# when needed.
if source is not None:
try:
# Let the source know that we're listening for this
# event.
source.listeningFor(event)
except AttributeError:
pass
def DeregisterWindow(self, win):
"""
Deregister all events coming from the given window.
"""
topics = self.__getTopics(win)
if topics:
for aTopic in topics:
self.__deregisterTopic(aTopic)
del self.windowTopicLookup[win]
def DeregisterListener(self, listener):
"""
Deregister all event notifications for the given listener.
"""
try:
topicList = self.listenerTopicLookup[listener]
except KeyError:
return
for topic in topicList:
topicDict = self.messageAdapterDict[topic]
if topicDict.has_key(listener):
topicDict[listener].Destroy()
del topicDict[listener]
if len(topicDict) == 0:
self.eventAdapterDict[topic].Destroy()
del self.eventAdapterDict[topic]
del self.messageAdapterDict[topic]
del self.listenerTopicLookup[listener]
def GetStats(self):
"""
Return a dictionary with data about
my state.
"""
stats = {}
stats['Adapters: Message'] = reduce(lambda x,y: x+y, map(len, self.messageAdapterDict.values()))
stats['Adapters: Event'] = len(self.eventAdapterDict)
stats['Topics: Total'] = len(self.__getTopics())
stats['Topics: Dead'] = len(self.GetDeadTopics())
return stats
def DeregisterDeadTopics(self):
"""
Deregister any entries relating to dead
wxPython objects. Not sure if this is an
important issue; 1) My app code always de-registers
listeners it doesn't need. 2) I don't think
that lingering references to these dead objects
is a problem.
"""
for topic in self.GetDeadTopics():
self.DeregisterTopic(topic)
def GetDeadTopics(self):
"""
Return a list of topics relating to dead wxPython
objects.
"""
return filter(self.__isDeadTopic, self.__getTopics())
def __winString(self, aWin):
"""
A string rep of a window for debugging
"""
from wxPython.wx import wxPyDeadObjectError
try:
name = aWin.GetClassName()
i = id(aWin)
return '%s #%d' % (name, i)
except wxPyDeadObjectError:
return '(dead wxObject)'
def __topicString(self, aTopic):
"""
A string rep of a topic for debugging
"""
return '[%-26s %s]' % (aTopic[0].__name__, self.winString(aTopic[1]))
def __listenerString(self, aListener):
"""
A string rep of a listener for debugging
"""
try:
return aListener.im_class.__name__ + '.' + aListener.__name__
except:
return 'Function ' + aListener.__name__
def __deregisterTopic(self, aTopic):
try:
messageAdapterList = self.messageAdapterDict[aTopic].values()
except KeyError:
# This topic isn't valid. Probably because it was deleted
# by listener.
return
for messageAdapter in messageAdapterList:
messageAdapter.Destroy()
self.eventAdapterDict[aTopic].Destroy()
del self.messageAdapterDict[aTopic]
del self.eventAdapterDict[aTopic]
def __getTopics(self, win=None):
if win is None:
return self.messageAdapterDict.keys()
if win is not None:
try:
return self.windowTopicLookup[win]
except KeyError:
return self.EMPTY_LIST
def __isDeadWxObject(self, anObject):
from wxPython.wx import _wxPyDeadObject
return isinstance(anObject, _wxPyDeadObject)
def __isDeadTopic(self, aTopic):
return self.__isDeadWxObject(aTopic[1])
def __haveMessageAdapter(self, eventHandler, topicPattern):
"""
Return true if there's already a message adapter
with these specs.
"""
try:
return self.messageAdapterDict[topicPattern].has_key(eventHandler)
except KeyError:
return 0
#---------------------------------------------------------------------------
# From here down is implementaion and support classes, although you may
# find some of them useful in other contexts.
#---------------------------------------------------------------------------
class EventMacroInfo:
"""
A class that provides information about event macros.
"""
def __init__(self):
self.lookupTable = {}
def getEventTypes(self, eventMacro):
"""
Return the list of event types that the given
macro corresponds to.
"""
try:
return self.lookupTable[eventMacro]
except KeyError:
win = FakeWindow()
try:
eventMacro(win, None, None)
except TypeError:
eventMacro(win, None)
self.lookupTable[eventMacro] = win.eventTypes
return win.eventTypes
def eventIsA(self, event, macroList):
"""
Return true if the event is one of the given
macros.
"""
eventType = event.GetEventType()
for macro in macroList:
if eventType in self.getEventTypes(macro):
return 1
return 0
def macroIsA(self, macro, macroList):
"""
Return true if the macro is in the macroList.
The added value of this method is that it takes
multi-events into account. The macroList parameter
will be coerced into a sequence if needed.
"""
if callable(macroList):
macroList = (macroList,)
testList = self.getEventTypes(macro)
eventList = []
for m in macroList:
eventList.extend(self.getEventTypes(m))
# Return true if every element in testList is in eventList
for element in testList:
if element not in eventList:
return 0
return 1
def isMultiEvent(self, macro):
"""
Return true if the given macro actually causes
multiple events to be registered.
"""
return len(self.getEventTypes(macro)) > 1
#---------------------------------------------------------------------------
class FakeWindow:
"""
Used internally by the EventMacroInfo class. The
FakeWindow is the most important component of the
macro-info utility: it implements the Connect()
protocol of wxWindow, but instead of registering
for events, it keeps track of what parameters where
passed to it.
"""
def __init__(self):
self.eventTypes = []
def Connect(self, id1, id2, eventType, handlerFunction):
self.eventTypes.append(eventType)
#---------------------------------------------------------------------------
class EventAdapter:
"""
A class that adapts incoming wxWindows events to
Publish/Subscribe messages.
In other words, this is the object that's seen by the
wxWindows system. Only one of these registers for any
particular wxWindows event. It then relays it into the
PS system, which lets many listeners respond.
"""
def __init__(self, func, win, id):
"""
Instantiate a new adapter. Pre-compute my Publish/Subscribe
topic, which is constant, and register with wxWindows.
"""
self.publisher = pubsub.Publisher()
self.topic = ((func, win, id),)
self.id = id
self.win = win
self.eventType = _macroInfo.getEventTypes(func)[0]
# Register myself with the wxWindows event system
try:
func(win, id, self.handleEvent)
self.callStyle = 3
except TypeError:
func(win, self.handleEvent)
self.callStyle = 2
def disconnect(self):
if self.callStyle == 3:
return self.win.Disconnect(self.id, -1, self.eventType)
else:
return self.win.Disconnect(-1, -1, self.eventType)
def handleEvent(self, event):
"""
In response to a wxWindows event, send a PS message
"""
self.publisher.sendMessage(topic=self.topic, data=event)
def Destroy(self):
from wxPython.wx import wxPyDeadObjectError
try:
if not self.disconnect():
print 'disconnect failed'
except wxPyDeadObjectError:
print 'disconnect failed: dead object' ##????
#---------------------------------------------------------------------------
class MessageAdapter:
"""
A class that adapts incoming Publish/Subscribe messages
to wxWindows event calls.
This class works opposite the EventAdapter, and
retrieves the information an EventAdapter has sent in a message.
Strictly speaking, this class is not required: Event listeners
could pull the original wxEvent object out of the PS Message
themselves.
However, by pairing an instance of this class with each wxEvent
handler, the handlers can use the standard API: they receive an
event as a parameter.
"""
def __init__(self, eventHandler, topicPattern):
"""
Instantiate a new MessageAdapter that send wxEvents to the
given eventHandler.
"""
self.eventHandler = eventHandler
pubsub.Publisher().subscribe(listener=self.notify, topic=(topicPattern,))
def notify(self, message):
event = message.data # Extract the wxEvent
self.eventHandler(event) # Perform the call as wxWindows would
event.Skip(1) # Make sure Skip(1) wasn't set. ##????
def Destroy(self):
pubsub.Publisher().unsubscribe(listener=self.notify)
#---------------------------------------------------------------------------
# Create globals
_macroInfo = EventMacroInfo()
# For now a singleton is not enforced. Should it be or can we trust
# the programmers?
eventManager = EventManager()
#---------------------------------------------------------------------------
# simple test code
if __name__ == '__main__':
from wxPython.wx import wxPySimpleApp, wxFrame, wxToggleButton, wxBoxSizer, wxHORIZONTAL, EVT_MOTION, EVT_LEFT_DOWN, EVT_TOGGLEBUTTON, wxALL
app = wxPySimpleApp()
frame = wxFrame(None, -1, 'Event Test', size=(300,300))
button = wxToggleButton(frame, -1, 'Listen for Mouse Events')
sizer = wxBoxSizer(wxHORIZONTAL)
sizer.Add(button, 0, 0 | wxALL, 10)
frame.SetAutoLayout(1)
frame.SetSizer(sizer)
#
# Demonstrate 1) register/deregister, 2) Multiple listeners receiving
# one event, and 3) Multiple events going to one listener.
#
def handleEvents(event):
print event.GetClassName(), event.GetTimestamp()
def enableFrameEvents(event):
# Turn the output of mouse events on and off
if event.IsChecked():
print '\nEnabling mouse events...'
eventManager.Register(handleEvents, EVT_MOTION, frame)
eventManager.Register(handleEvents, EVT_LEFT_DOWN, frame)
else:
print '\nDisabling mouse events...'
eventManager.DeregisterWindow(frame)
# Send togglebutton events to both the on/off code as well
# as the function that prints to stdout.
eventManager.Register(handleEvents, EVT_TOGGLEBUTTON, button)
eventManager.Register(enableFrameEvents, EVT_TOGGLEBUTTON, button)
frame.CenterOnScreen()
frame.Show(1)
app.MainLoop()

View File

@@ -0,0 +1,377 @@
#---------------------------------------------------------------------------
# Name: wxPython.lib.pubsub
# Purpose: The Publish/Subscribe framework used by evtmgr.EventManager
#
# Author: Rob Schecter and Robin Dunn
#
# Created: 12-December-2002
# RCS-ID: $Id$
# Copyright: (c) 2002 by Rob Schecter
# Licence: wxWindows license
#---------------------------------------------------------------------------
"""
This module has classes for implementing the Publish/Subscribe design
pattern.
It's a very flexible PS implementation: The message topics are tuples
of any length, containing any objects (that can be used as hash keys).
A subscriber's topic matches any message topic for which it's a
sublist.
It also has many optimizations to favor time efficiency (ie., run-time
speed). I did this because I use it to support extreme uses. For
example, piping every wxWindows mouse event through to multiple
listeners, and expecting the app to have no noticeable slowdown. This
has made the code somewhat obfuscated, but I've done my best to
document it.
The Server and Message classes are the two that clients interact
with..
This module is compatible with Python 2.1.
Author: Robb Shecter
"""
#---------------------------------------------------------------------------
class Publisher:
"""
The publish/subscribe server. This class is a Singleton.
"""
def __init__(self):
self.topicDict = {}
self.functionDict = {}
self.subscribeAllList = []
self.messageCount = 0
self.deliveryCount = 0
#
# Public API
#
def subscribe(self, topic, listener):
"""
Add the given subscription to the list. This will
add an entry recording the fact that the listener wants
to get messages for (at least) the given topic. This
method may be called multiple times for one listener,
registering it with many topics. It can also be invoked
many times for a particular topic, each time with a
different listener.
listener: expected to be either a method or function that
takes zero or one parameters. (Not counting 'self' in the
case of methods. If it accepts a parameter, it will be given
a reference to a Message object.
topic: will be converted to a tuple if it isn't one.
It's a pattern matches any topic that it's a sublist
of. For example, this pattern:
('sports',)
would match these:
('sports',)
('sports', 'baseball')
('sports', 'baseball', 'highscores')
but not these:
()
('news')
(12345)
"""
if not callable(listener):
raise TypeError('The P/S listener, '+`listener`+', is not callable.')
aTopic = Topic(topic)
# Determine now (at registration time) how many parameters
# the listener expects, and get a reference to a function which
# calls it correctly at message-send time.
callableVersion = self.__makeCallable(listener)
# Add this tuple to a list which is in a dict keyed by
# the topic's first element.
self.__addTopicToCorrectList(aTopic, listener, callableVersion)
# Add to a dict in order to speed-up unsubscribing.
self.__addFunctionLookup(listener, aTopic)
def unsubscribe(self, listener):
"""
Remove the given listener from the registry,
for all topics that it's associated with.
"""
if not callable(listener):
raise TypeError('The P/S listener, '+`listener`+', is not callable.')
topicList = self.getAssociatedTopics(listener)
for aTopic in topicList:
subscriberList = self.__getTopicList(aTopic)
listToKeep = []
for subscriber in subscriberList:
if subscriber[0] != listener:
listToKeep.append(subscriber)
self.__setTopicList(aTopic, listToKeep)
self.__delFunctionLookup(listener)
def getAssociatedTopics(self, listener):
"""
Return a list of topics the given listener is
registered with.
"""
return self.functionDict.get(listener, [])
def sendMessage(self, topic, data=None):
"""
Relay a message to registered listeners.
"""
aTopic = Topic(topic)
message = Message(aTopic.items, data)
topicList = self.__getTopicList(aTopic)
# Send to the matching topics
for subscriber in topicList:
if subscriber[1].matches(aTopic):
subscriber[2](message)
# Send to any listeners registered for ALL
for subscriber in self.subscribeAllList:
subscriber[2](message)
#
# Private methods
#
def __makeCallable(self, function):
"""
Return a function that is what the server
will actually call.
This is a time optimization: this removes a test
for the number of parameters from the inner loop
of sendMessage().
"""
parameters = self.__parameterCount(function)
if parameters == 0:
# Return a function that calls the listener
# with no arguments.
return lambda m, f=function: f()
elif parameters == 1:
# Return a function that calls the listener
# with one argument (which will be the message).
return lambda m, f=function: f(m)
else:
raise TypeError('The publish/subscribe listener, '+`function`+', has wrong parameter count')
def __parameterCount(self, callableObject):
"""
Return the effective number of parameters required
by the callable object. In other words, the 'self'
parameter of methods is not counted.
"""
try:
# Try to handle this like a method
return callableObject.im_func.func_code.co_argcount - 1
except AttributeError:
pass
try:
# Try to handle this like a function
return callableObject.func_code.co_argcount
except AttributeError:
raise 'Cannot determine if this is a method or function: '+str(callableObject)
def __addFunctionLookup(self, aFunction, aTopic):
try:
aList = self.functionDict[aFunction]
except KeyError:
aList = []
self.functionDict[aFunction] = aList
aList.append(aTopic)
def __delFunctionLookup(self, aFunction):
try:
del self.functionDict[aFunction]
except KeyError:
print 'Warning: listener not found. Logic error in PublishSubscribe?', aFunction
def __addTopicToCorrectList(self, topic, listener, callableVersion):
if len(topic.items) == 0:
self.subscribeAllList.append((listener, topic, callableVersion))
else:
self.__getTopicList(topic).append((listener, topic, callableVersion))
def __getTopicList(self, aTopic):
"""
Return the correct sublist of subscribers based on the
given topic.
"""
try:
elementZero = aTopic.items[0]
except IndexError:
return self.subscribeAllList
try:
subList = self.topicDict[elementZero]
except KeyError:
subList = []
self.topicDict[elementZero] = subList
return subList
def __setTopicList(self, aTopic, aSubscriberList):
try:
self.topicDict[aTopic.items[0]] = aSubscriberList
except IndexError:
self.subscribeAllList = aSubscriberList
def __call__(self):
return self
# Create an instance with the same name as the class, effectivly
# hiding the class object so it can't be instantiated any more. From
# this point forward any calls to Publisher() will invoke the __call__
# of this instance which just returns itself.
Publisher = Publisher()
#---------------------------------------------------------------------------
class Message:
"""
A simple container object for the two components of
a message; the topic and the data.
"""
def __init__(self, topic, data):
self.topic = topic
self.data = data
def __str__(self):
return '[Topic: '+`self.topic`+', Data: '+`self.data`+']'
#---------------------------------------------------------------------------
class Topic:
"""
A class that represents a publish/subscribe topic.
Currently, it's only used internally in the framework; the
API expects and returns plain old tuples.
It currently exists mostly as a place to keep the matches()
function. This function, though, could also correctly be
seen as an attribute of the P/S server. Getting rid of this
class would also mean one fewer object instantiation per
message send.
"""
listType = type([])
tupleType = type(())
def __init__(self, items):
# Make sure we have a tuple.
if type(items) == self.__class__.listType:
items = tuple(items)
elif type(items) != self.__class__.tupleType:
items = (items,)
self.items = items
self.length = len(items)
def matches(self, aTopic):
"""
Consider myself to be a topic pattern,
and return true if I match the given specific
topic. For example,
a = ('sports')
b = ('sports','baseball')
a.matches(b) --> 1
b.matches(a) --> 0
"""
# The question this method answers is equivalent to;
# is my list a sublist of aTopic's? So, my algorithm
# is: 1) make a copy of the aTopic list which is
# truncated to the pattern's length. 2) Test for
# equality.
#
# This algorithm may be somewhat memory-intensive,
# because it creates a temporary list on each
# call to match. A possible to-do would be to
# re-write this with a hand-coded loop.
return (self.items == aTopic.items[:self.length])
def __repr__(self):
import string
return '<Topic>' + string.join(map(repr, self.items), ', ') + '</Topic>'
def __eq__(self, aTopic):
"""
Return true if I equal the given topic. We're considered
equal if our tuples are equal.
"""
if type(self) != type(aTopic):
return 0
else:
return self.items == aTopic.items
def __ne__(self, aTopic):
"""
Return false if I equal the given topic.
"""
return not self == aTopic
#---------------------------------------------------------------------------
#
# Code for a simple command-line test
#
if __name__ == '__main__':
class SimpleListener:
def __init__(self, number):
self.number = number
def notify(self, message):
print '#'+str(self.number)+' got the message:', message
# Build a list of ten listeners.
lList = []
for x in range(10):
lList.append(SimpleListener(x))
server = Publisher()
# Everyone's interested in politics...
for x in lList:
Publisher().subscribe(topic='politics', listener=x.notify) # also tests singleton
# But only the first four are interested in trivia.
for x in lList[:4]:
server.subscribe(topic='trivia', listener=x.notify)
# This one subscribes to everything.
everythingListener = SimpleListener(999)
server.subscribe(topic=(), listener=everythingListener.notify)
# Now send out two messages, testing topic matching.
server.sendMessage(topic='trivia', data='What is the capitol of Oregon?')
server.sendMessage(topic=('politics','germany'), data='The Greens have picked up another seat in the Bundestag.')
#---------------------------------------------------------------------------