git-svn-id: https://svn.wxwidgets.org/svn/wx/wxWidgets/trunk@41354 c3d73ce0-8a6f-49c7-b76d-6d57e0e08775
		
			
				
	
	
		
			413 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			413 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""
 | 
						|
This module supports the thread-safe, asynchronous transmission of data 
 | 
						|
('delayed results') from a worker (non-GUI) thread to the main thread. Ie you don't 
 | 
						|
need to mutex lock any data, the worker thread doesn't wait (or even check) 
 | 
						|
for the result to be received, and the main thread doesn't wait for the 
 | 
						|
worker thread to send the result. Instead, the consumer will be called 
 | 
						|
automatically by the wx app when the worker thread result is available. 
 | 
						|
 | 
						|
In most cases you just need to use startWorker() with the correct parameters
 | 
						|
(your worker function and your 'consumer' in the simplest of cases). The 
 | 
						|
only requirement on consumer is that it must accept a DelayedResult instance 
 | 
						|
as first arg. 
 | 
						|
 | 
						|
In the following example, this will call consumer(delayedResult) with the 
 | 
						|
return value from workerFn::
 | 
						|
 | 
						|
    from delayedresult import startWorker
 | 
						|
    startWorker(consumer, workerFn)
 | 
						|
 | 
						|
More advanced uses: 
 | 
						|
 | 
						|
- The other parameters to startWorker()
 | 
						|
- Derive from Producer to override _extraInfo (e.g. to provide traceback info)
 | 
						|
- Create your own worker-function-thread wrapper instead of using Producer
 | 
						|
- Create your own Handler-like wrapper to pre- or post-process the result 
 | 
						|
  (see PreProcessChain)
 | 
						|
- Derive from Sender to use your own way of making result hop over the 
 | 
						|
  "thread boundary" (from non-main thread to main thread), e.g. using Queue
 | 
						|
 | 
						|
Thanks to Josiah Carlson for critical feedback/ideas that helped me 
 | 
						|
improve this module. 
 | 
						|
 | 
						|
:Copyright: (c) 2006 by Oliver Schoenborn
 | 
						|
:License: wxWidgets license
 | 
						|
:Version: 1.0
 | 
						|
 | 
						|
"""
 | 
						|
 | 
						|
__author__  = 'Oliver Schoenborn at utoronto dot ca'
 | 
						|
__version__ = '1.0'
 | 
						|
 | 
						|
__all__ = ('Sender', 'SenderNoWx', 'SenderWxEvent', 'SenderCallAfter', 
 | 
						|
    'Handler', 'DelayedResult', 'Producer', 'startWorker', 'PreProcessChain')
 | 
						|
 | 
						|
 | 
						|
import wx
 | 
						|
import threading
 | 
						|
 | 
						|
 | 
						|
class Struct:
 | 
						|
    """
 | 
						|
    An object that has attributes built from the dictionary given in 
 | 
						|
    constructor. So ss=Struct(a=1, b='b') will satisfy assert ss.a == 1
 | 
						|
    and assert ss.b == 'b'.
 | 
						|
    """
 | 
						|
    
 | 
						|
    def __init__(self, **kwargs):
 | 
						|
        self.__dict__.update( kwargs )
 | 
						|
 | 
						|
 | 
						|
class Handler:
 | 
						|
    """
 | 
						|
    Bind some of the arguments and keyword arguments of a callable ('listener'). 
 | 
						|
    Then when the Handler instance is called (e.g. handler(result, **kwargs))
 | 
						|
    the result is passed as first argument to callable, the kwargs is 
 | 
						|
    combined with those given at construction, and the args are those
 | 
						|
    given at construction. Its return value is returned.
 | 
						|
    """
 | 
						|
    def __init__(self, listener, *args, **kwargs ):
 | 
						|
        """Bind args and kwargs to listener. """
 | 
						|
        self.__listener = listener
 | 
						|
        self.__args = args
 | 
						|
        self.__kwargs = kwargs
 | 
						|
        
 | 
						|
    def __call__(self, result, **moreKwargs):
 | 
						|
        """Listener is assumed to take result as first arg, then *args, 
 | 
						|
        then the combination of moreKwargs and the kwargs given at construction."""
 | 
						|
        if moreKwargs:
 | 
						|
            moreKwargs.update(self.__kwargs)
 | 
						|
        else:
 | 
						|
            moreKwargs = self.__kwargs
 | 
						|
        return self.__listener(result, *self.__args, **moreKwargs)
 | 
						|
 | 
						|
        
 | 
						|
class Sender:
 | 
						|
    """
 | 
						|
    Base class for various kinds of senders. A sender sends a result
 | 
						|
    produced by a worker funtion to a result handler (listener). Note
 | 
						|
    that each sender can be given a "job id". This can be anything
 | 
						|
    (number, string, id, and object, etc) and is not used, it is 
 | 
						|
    simply added as attribute whenever a DelayedResult is created. 
 | 
						|
    This allows you to know, if desired, what result corresponds to 
 | 
						|
    which sender. Note that uniqueness is not necessary. 
 | 
						|
    
 | 
						|
    Derive from this class if none of the existing derived classes
 | 
						|
    are adequate, and override _sendImpl(). 
 | 
						|
    """
 | 
						|
    
 | 
						|
    def __init__(self, jobID=None):
 | 
						|
        """The optional jobID can be anything that you want to use to 
 | 
						|
        track which sender particular results come from. """
 | 
						|
        self.__jobID = jobID
 | 
						|
 | 
						|
    def getJobID(self):
 | 
						|
        """Return the jobID given at construction"""
 | 
						|
        return self.__jobID
 | 
						|
    
 | 
						|
    def sendResult(self, result):
 | 
						|
        """This will send the result to handler, using whatever 
 | 
						|
        technique the derived class uses. """
 | 
						|
        delayedResult = DelayedResult(result, jobID=self.__jobID)
 | 
						|
        self._sendImpl(delayedResult)
 | 
						|
 | 
						|
    def sendException(self, exception, extraInfo = None):
 | 
						|
        """Use this when the worker function raised an exception.
 | 
						|
        The *exception* is the instance of Exception caught. The extraInfo
 | 
						|
        could be anything you want (e.g. locals or traceback etc), 
 | 
						|
        it will be added to the exception as attribute 'extraInfo'. The
 | 
						|
        exception will be raised when DelayedResult.get() is called."""
 | 
						|
        assert exception is not None
 | 
						|
        delayedResult = DelayedResult(extraInfo, 
 | 
						|
            exception=exception, jobID=self.__jobID)
 | 
						|
        self._sendImpl(delayedResult)
 | 
						|
 | 
						|
    def _sendImpl(self, delayedResult):
 | 
						|
        msg = '_sendImpl() must be implemented in %s' % self.__class__
 | 
						|
        raise NotImplementedError(msg)
 | 
						|
        
 | 
						|
    
 | 
						|
class SenderNoWx( Sender ):
 | 
						|
    """
 | 
						|
    Sender that works without wx. The results are sent directly, ie
 | 
						|
    the consumer will get them "in the worker thread". So it should 
 | 
						|
    only be used for testing. 
 | 
						|
    """
 | 
						|
    def __init__(self, consumer, jobID=None, args=(), kwargs={}):
 | 
						|
        """The consumer can be any callable of the form 
 | 
						|
        callable(result, *args, **kwargs)"""
 | 
						|
        Sender.__init__(self, jobID)
 | 
						|
        if args or kwargs:
 | 
						|
            self.__consumer = Handler(consumer, *args, **kwargs)
 | 
						|
        else: 
 | 
						|
            self.__consumer = consumer
 | 
						|
            
 | 
						|
    def _sendImpl(self, delayedResult):
 | 
						|
        self.__consumer(delayedResult)
 | 
						|
        
 | 
						|
 | 
						|
class SenderWxEvent( Sender ):
 | 
						|
    """
 | 
						|
    This sender sends the delayed result produced in the worker thread
 | 
						|
    to an event handler in the main thread, via a wx event of class 
 | 
						|
    *eventClass*. The result is an attribute of the event (default: 
 | 
						|
    "delayedResult". 
 | 
						|
    """
 | 
						|
    def __init__(self, handler, eventClass, resultAttr="delayedResult", 
 | 
						|
        jobID=None, **kwargs):
 | 
						|
        """The handler must derive from wx.EvtHandler. The event class 
 | 
						|
        is typically the first item in the pair returned by 
 | 
						|
        wx.lib.newevent.NewEvent(). You can use the *resultAttr* 
 | 
						|
        to change the attribute name of the generated event's 
 | 
						|
        delayed result. """
 | 
						|
        Sender.__init__(self, jobID)
 | 
						|
        if not isinstance(handler, wx.EvtHandler):
 | 
						|
            msg = 'SenderWxEvent(handler=%s, ...) not allowed,' % type(handler)
 | 
						|
            msg = '%s handler must derive from wx.EvtHandler' % msg
 | 
						|
            raise ValueError(msg)
 | 
						|
        self.__consumer = Struct(handler=handler, eventClass=eventClass, 
 | 
						|
                                 resultAttr=resultAttr, kwargs=kwargs)
 | 
						|
        
 | 
						|
    def _sendImpl(self, delayedResult):
 | 
						|
        """Must not modify the consumer (that was created at construction) 
 | 
						|
        since might be shared by several senders, each sending from 
 | 
						|
        separate threads."""
 | 
						|
        consumer = self.__consumer
 | 
						|
        kwargs = consumer.kwargs.copy()
 | 
						|
        kwargs[ consumer.resultAttr ] = delayedResult
 | 
						|
        event = consumer.eventClass(** kwargs)
 | 
						|
        wx.PostEvent(consumer.handler, event)
 | 
						|
 | 
						|
 | 
						|
class SenderCallAfter( Sender ):
 | 
						|
    """
 | 
						|
    This sender sends the delayed result produced in the worker thread
 | 
						|
    to a callable in the main thread, via wx.CallAfter. 
 | 
						|
    """
 | 
						|
    def __init__(self, listener, jobID=None, args=(), kwargs={}):
 | 
						|
        Sender.__init__(self, jobID)
 | 
						|
        if args or kwargs:
 | 
						|
            self.__consumer = Handler(listener, *args, **kwargs)
 | 
						|
        else:
 | 
						|
            self.__consumer = listener
 | 
						|
            
 | 
						|
    def _sendImpl(self, delayedResult):
 | 
						|
        wx.CallAfter(self.__consumer, delayedResult)
 | 
						|
        
 | 
						|
 | 
						|
class DelayedResult:
 | 
						|
    """
 | 
						|
    Represent the actual delayed result coming from the non-main thread. 
 | 
						|
    An instance of this is given to the result handler. This result is 
 | 
						|
    either a (reference to a) the value sent, or an exception. 
 | 
						|
    If the latter, the exception is raised when the get() method gets
 | 
						|
    called. 
 | 
						|
    """
 | 
						|
    
 | 
						|
    def __init__(self, result, jobID=None, exception = None):
 | 
						|
        """You should never have to call this yourself. A DelayedResult 
 | 
						|
        is created by a concrete Sender for you."""
 | 
						|
        self.__result = result
 | 
						|
        self.__exception = exception
 | 
						|
        self.__jobID = jobID
 | 
						|
 | 
						|
    def getJobID(self):
 | 
						|
        """Return the jobID given when Sender initialized, 
 | 
						|
        or None if none given. """
 | 
						|
        return self.__jobID 
 | 
						|
    
 | 
						|
    def get(self):
 | 
						|
        """Get the result. If an exception was sent instead of a result, 
 | 
						|
        (via Sender's sendExcept()), that **exception is raised**.
 | 
						|
        Otherwise the result is simply returned. """
 | 
						|
        if self.__exception: # exception was raised!
 | 
						|
            self.__exception.extraInfo = self.__result
 | 
						|
            raise self.__exception
 | 
						|
        
 | 
						|
        return self.__result
 | 
						|
 | 
						|
 | 
						|
class AbortedException(Exception):
 | 
						|
    """Raise this in your worker function so that the sender knows 
 | 
						|
    not to send a result to handler."""
 | 
						|
    pass
 | 
						|
    
 | 
						|
 | 
						|
class Producer(threading.Thread):
 | 
						|
    """
 | 
						|
    Represent the worker thread that produces delayed results. 
 | 
						|
    It causes the given function to run in a separate thread, 
 | 
						|
    and a sender to be used to send the return value of the function.
 | 
						|
    As with any threading.Thread, instantiate and call start().
 | 
						|
    Note that if the workerFn raises AbortedException, the result is not 
 | 
						|
    sent and the thread terminates gracefully.
 | 
						|
    """
 | 
						|
    
 | 
						|
    def __init__(self, sender, workerFn, args=(), kwargs={}, 
 | 
						|
                 name=None, group=None, daemon=False, 
 | 
						|
                 sendReturn=True, senderArg=None):
 | 
						|
        """The sender will send the return value of 
 | 
						|
        workerFn(*args, **kwargs) to the main thread. The name and group 
 | 
						|
        are same as threading.Thread constructor parameters. Daemon causes 
 | 
						|
        setDaemon() to be called. If sendReturn is False, then the return 
 | 
						|
        value of workerFn() will not be sent. If senderArg is given, it 
 | 
						|
        must be the name of the keyword arg to use to pass the sender into 
 | 
						|
        the workerFn, so the function can send (typically many) results."""
 | 
						|
        if senderArg:
 | 
						|
            kwargs[senderArg] = sender
 | 
						|
        def wrapper():
 | 
						|
            try: 
 | 
						|
                result = workerFn(*args, **kwargs)
 | 
						|
            except AbortedException:
 | 
						|
                pass
 | 
						|
            except Exception, exc:
 | 
						|
                extraInfo = self._extraInfo(exc)
 | 
						|
                sender.sendException(exc, extraInfo)
 | 
						|
            else:
 | 
						|
                if sendReturn:
 | 
						|
                    sender.sendResult(result)
 | 
						|
            
 | 
						|
        threading.Thread.__init__(self, name=name, group=group, target=wrapper)
 | 
						|
        if daemon:
 | 
						|
            self.setDaemon(daemon)
 | 
						|
    
 | 
						|
    def _extraInfo(self, exception):
 | 
						|
        """This method could be overridden in a derived class to provide 
 | 
						|
        extra information when an exception is being sent instead of a 
 | 
						|
        result. """
 | 
						|
        return None
 | 
						|
 | 
						|
 | 
						|
class AbortEvent:
 | 
						|
    """
 | 
						|
    Convenience class that represents a kind of threading.Event that
 | 
						|
    raises AbortedException when called (see the __call__ method, everything
 | 
						|
    else is just to make it look like threading.Event).
 | 
						|
    """
 | 
						|
    
 | 
						|
    def __init__(self):
 | 
						|
        self.__ev = threading.Event()
 | 
						|
 | 
						|
    def __call__(self, timeout=None):
 | 
						|
        """See if event has been set (wait at most timeout if given).  If so, 
 | 
						|
        raise AbortedException. Otherwise return None. Allows you to do
 | 
						|
        'while not event():' which will always succeed unless the event 
 | 
						|
        has been set (then AbortedException will cause while to exit)."""
 | 
						|
        if timeout:
 | 
						|
            self.__ev.wait(timeout)
 | 
						|
        if self.__ev.isSet():
 | 
						|
            raise AbortedException()
 | 
						|
        return None
 | 
						|
    
 | 
						|
    def __getattr__(self, name):
 | 
						|
        """This allows us to be a kind of threading.Event."""
 | 
						|
        if name in ('set','clear','wait','isSet'):
 | 
						|
            return getattr(self.__ev, name)
 | 
						|
 | 
						|
 | 
						|
def startWorker(
 | 
						|
    consumer, workerFn, 
 | 
						|
    cargs=(), ckwargs={}, 
 | 
						|
    wargs=(), wkwargs={},
 | 
						|
    jobID=None, group=None, daemon=False, 
 | 
						|
    sendReturn=True, senderArg=None):
 | 
						|
    """
 | 
						|
    Convenience function to send data produced by workerFn(*wargs, **wkwargs) 
 | 
						|
    running in separate thread, to a consumer(*cargs, **ckwargs) running in
 | 
						|
    the main thread. This function merely creates a SenderCallAfter (or a
 | 
						|
    SenderWxEvent, if consumer derives from wx.EvtHandler), and a Producer,
 | 
						|
    and returns immediately after starting the Producer thread. The jobID
 | 
						|
    is used for the Sender and as name for the Producer thread. Returns the 
 | 
						|
    thread created, in case caller needs join/etc.
 | 
						|
    """
 | 
						|
    
 | 
						|
    if isinstance(consumer, wx.EvtHandler):
 | 
						|
        eventClass = cargs[0]
 | 
						|
        sender = SenderWxEvent(consumer, eventClass, jobID=jobID, **ckwargs)
 | 
						|
    else:
 | 
						|
        sender = SenderCallAfter(consumer, jobID, args=cargs, kwargs=ckwargs)
 | 
						|
        
 | 
						|
    thread = Producer(
 | 
						|
        sender, workerFn, args=wargs, kwargs=wkwargs, 
 | 
						|
        name=jobID, group=group, daemon=daemon, 
 | 
						|
        senderArg=senderArg, sendReturn=sendReturn)
 | 
						|
        
 | 
						|
    thread.start() 
 | 
						|
    return thread
 | 
						|
 | 
						|
 | 
						|
class PreProcessChain:
 | 
						|
    """
 | 
						|
    Represent a 'delayed result pre-processing chain', a kind of Handler. 
 | 
						|
    Useful when lower-level objects need to apply a sequence of transformations 
 | 
						|
    to the delayed result before handing it over to a final handler. 
 | 
						|
    This allows the starter of the worker function to not know 
 | 
						|
    anything about the lower-level objects. 
 | 
						|
    """
 | 
						|
    def __init__(self, handler, *args, **kwargs):
 | 
						|
        """Wrap handler(result, *args, **kwargs) so that the result 
 | 
						|
        it receives has been transformed by us. """
 | 
						|
        if handler is None:# assume rhs is a chain
 | 
						|
            self.__chain = args[0]
 | 
						|
        else:
 | 
						|
            if args or kwargs:
 | 
						|
                handler = Handler(handler, *args, **kwargs)
 | 
						|
            self.__chain = [handler]
 | 
						|
 | 
						|
    def addSub(self, callable, *args, **kwargs):
 | 
						|
        """Add a sub-callable, ie a callable(result, *args, **kwargs)
 | 
						|
        that returns a transformed result to the previously added
 | 
						|
        sub-callable (or the handler given at construction, if this is 
 | 
						|
        the first call to addSub). """
 | 
						|
        self.__chain.append( Handler(callable, *args, **kwargs) )
 | 
						|
        
 | 
						|
    def clone(self):
 | 
						|
        """Clone the chain. Shallow only. Useful when several threads 
 | 
						|
        must be started but have different sub-callables. """
 | 
						|
        return PreProcessChain(None, self.__chain[:] )
 | 
						|
    
 | 
						|
    def cloneAddSub(self, callable, *args, **kwargs):
 | 
						|
        """Convenience method that first clones self, then calls addSub() 
 | 
						|
        on that clone with given arguments. """
 | 
						|
        cc = self.clone()
 | 
						|
        cc.addSub(callable, *args, **kwargs)
 | 
						|
        
 | 
						|
    def count(self):
 | 
						|
        """How many pre-processors in the chain"""
 | 
						|
        return len(self.__chain)
 | 
						|
    
 | 
						|
    class Traverser:
 | 
						|
        """
 | 
						|
        Traverses the chain of pre-processors it is given, transforming
 | 
						|
        the original delayedResult along the way. The return value of each 
 | 
						|
        callable added via addSub() is given to the previous addSub() callable,
 | 
						|
        until the handler is reached. 
 | 
						|
        """
 | 
						|
        def __init__(self, delayedResult, chain):
 | 
						|
            self.__dr = delayedResult
 | 
						|
            self.__chain = chain
 | 
						|
            
 | 
						|
        def get(self):
 | 
						|
            """This makes handler think we are a delayedResult."""
 | 
						|
            if not self.__chain:
 | 
						|
                return self.__dr.get()
 | 
						|
            
 | 
						|
            handler = self.__chain[0]
 | 
						|
            del self.__chain[0]
 | 
						|
            return handler(self)
 | 
						|
        
 | 
						|
        def getJobID(self):
 | 
						|
            """Return the job id for the delayedResult we transform."""
 | 
						|
            return self.__dr.getJobID()
 | 
						|
 | 
						|
 | 
						|
    def __call__(self, delayedResult):
 | 
						|
        """This makes us a Handler. We just call handler(Traverser). The
 | 
						|
        handler will think it is getting a delayed result, but in fact 
 | 
						|
        will be getting an instance of Traverser, which will take care
 | 
						|
        of properly applying the chain of transformations to delayedResult."""
 | 
						|
        chainTrav = self.Traverser(delayedResult, self.__chain[1:])
 | 
						|
        handler = self.__chain[0]
 | 
						|
        handler( chainTrav )
 | 
						|
        
 |