made wxCondition::Signal() queue the signals generated while there were no waiters and documented this behaviour

git-svn-id: https://svn.wxwidgets.org/svn/wx/wxWidgets/trunk@13956 c3d73ce0-8a6f-49c7-b76d-6d57e0e08775
This commit is contained in:
Vadim Zeitlin
2002-02-01 15:43:37 +00:00
parent 2e1e7f9dfc
commit 60ce696e26
3 changed files with 111 additions and 133 deletions

View File

@@ -11,8 +11,63 @@ perfect because in this particular case it would be much better to just
\helpref{Wait()}{wxthreadwait} for the worker thread, but if there are several \helpref{Wait()}{wxthreadwait} for the worker thread, but if there are several
worker threads it already makes much more sense). worker threads it already makes much more sense).
Once the thread(s) are signaled, the condition then resets to the not Note that a call to \helpref{Signal()}{wxconditionsignal} may happen before the
signaled state, ready to fire again. other thread calls \helpref{Wait()}{wxconditionwait} but, in marked contrast
with the pthread conditions, this will still work as the missed signals are
queued and \helpref{Wait()}{wxconditionwait} simply returns immediately if
there are ny pending signals.
However, the calls to \helpref{Broadcast()}{wxconditionbroadcast} are {\bf not}
queued and so it will only wake up the threads already waiting on the
condition. Accordingly, you will probably want to use a mutex to ensure that
the thread(s) you want to be waken up have indeed started to wait before
calling \helpref{Broadcast()}{wxconditionbroadcast}.
\wxheading{Example}
This example shows how a main thread may launch a worker thread and wait until
it starts running:
\begin{verbatim}
class MyWaitingThread : public wxThread
{
public:
MyWaitingThread(wxCondition *condition)
{
m_condition = condition;
Create();
}
virtual ExitCode Entry()
{
// let the main thread know that we started running
m_condition->Signal();
... do our job ...
return 0;
}
private:
wxCondition *m_condition;
};
int main()
{
wxCondition condition;
MyWaitingThread *thread - new MyWaitingThread(&condition);
thread->Run();
// wait until the thread really starts running
condition.Wait();
...
return 0;
}
\end{verbatim}
\wxheading{Derived from} \wxheading{Derived from}

View File

@@ -1117,8 +1117,11 @@ WX_DECLARE_HASH_MAP( short, unsigned, wxIntegerHash, wxIntegerEqual,
myTestHashMap3 ); myTestHashMap3 );
WX_DECLARE_HASH_MAP( unsigned short, unsigned, wxIntegerHash, wxIntegerEqual, WX_DECLARE_HASH_MAP( unsigned short, unsigned, wxIntegerHash, wxIntegerEqual,
myTestHashMap4 ); myTestHashMap4 );
WX_DECLARE_HASH_MAP( wxString, wxString, wxStringHash, wxStringEqual,
myStringHashMap ); // same as:
// WX_DECLARE_HASH_MAP( wxString, wxString, wxStringHash, wxStringEqual,
// myStringHashMap );
WX_DECLARE_STRING_HASH_MAP(wxString, myStringHashMap);
typedef myStringHashMap::iterator Itor; typedef myStringHashMap::iterator Itor;
@@ -1237,7 +1240,7 @@ static void TestHashMap()
printf("*** Finished testing wxHashMap ***\n"); printf("*** Finished testing wxHashMap ***\n");
} }
#endif TEST_HASHMAP #endif // TEST_HASHMAP
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// wxList // wxList
@@ -4790,7 +4793,7 @@ static void TestThreadConditions()
condition.GetId(), gs_cond.GetId()); condition.GetId(), gs_cond.GetId());
// create and launch threads // create and launch threads
MyWaitingThread *threads[5]; MyWaitingThread *threads[10];
size_t n; size_t n;
for ( n = 0; n < WXSIZEOF(threads); n++ ) for ( n = 0; n < WXSIZEOF(threads); n++ )
@@ -4823,13 +4826,15 @@ static void TestThreadConditions()
wxThread::Sleep(500); wxThread::Sleep(500);
#if 0 #if 1
// now wake one of them up // now wake one of them up
printf("Main thread: about to signal the condition.\n"); printf("Main thread: about to signal the condition.\n");
fflush(stdout); fflush(stdout);
condition.Signal(); condition.Signal();
#endif #endif
wxThread::Sleep(200);
// wake all the (remaining) threads up, so that they can exit // wake all the (remaining) threads up, so that they can exit
printf("Main thread: about to broadcast the condition.\n"); printf("Main thread: about to broadcast the condition.\n");
fflush(stdout); fflush(stdout);

View File

@@ -367,39 +367,24 @@ public:
void Signal(bool all = FALSE); void Signal(bool all = FALSE);
private: private:
// the number of Signal() calls queued // the number of Signal() calls we "missed", i.e. which were done while
// // there were no threads to wait for them
// changed by Signal(), accessed by Wait() size_t m_nQueuedSignals;
//
// protected by m_mutexSignalCount
size_t m_nSignalsMissed;
// protects access to m_nSignalsMissed // counts all pending waiters
pthread_mutex_t m_mutexSignalCount; size_t m_nWaiters;
// serializes Broadcast() and/or Signal() calls
//
// TODO: I'm not sure if this is really needed but it shouldn't harm
// neither (except for efficiency condierations). However MSW doesn't
// do this so maybe we shouldn't do it here neither? (VZ)
pthread_mutex_t m_mutexSignal;
// a condition variable must be always used with a mutex and so we maintain
// a list of mutexes - one for each thread that calls Wait().
//
// access to this list must be protected by m_mutexListContents
wxMutexList m_mutexes;
// the condition itself // the condition itself
pthread_cond_t m_condition; pthread_cond_t m_condition;
// protects all accesses to m_mutexes list // the mutex used with the conditon: it also protects the counters above
pthread_mutex_t m_mutexListContents; pthread_mutex_t m_mutex;
}; };
wxConditionInternal::wxConditionInternal() wxConditionInternal::wxConditionInternal()
{ {
m_nSignalsMissed = 0; m_nQueuedSignals =
m_nWaiters = 0;
if ( pthread_cond_init(&m_condition, (pthread_condattr_t *)NULL) != 0 ) if ( pthread_cond_init(&m_condition, (pthread_condattr_t *)NULL) != 0 )
{ {
@@ -407,9 +392,7 @@ wxConditionInternal::wxConditionInternal()
wxFAIL_MSG( _T("pthread_cond_init() failed") ); wxFAIL_MSG( _T("pthread_cond_init() failed") );
} }
if ( pthread_mutex_init(&m_mutexSignalCount, NULL) != 0 || if ( pthread_mutex_init(&m_mutex, NULL) != 0 )
pthread_mutex_init(&m_mutexListContents, NULL) != 0 ||
pthread_mutex_init(&m_mutexSignal, NULL) != 0 )
{ {
// neither this // neither this
wxFAIL_MSG( _T("wxCondition: pthread_mutex_init() failed") ); wxFAIL_MSG( _T("wxCondition: pthread_mutex_init() failed") );
@@ -424,13 +407,7 @@ wxConditionInternal::~wxConditionInternal()
"threads are probably still waiting on it?)")); "threads are probably still waiting on it?)"));
} }
// the list of waiters mutexes must be empty by now if ( pthread_mutex_destroy( &m_mutex ) != 0 )
wxASSERT_MSG( !m_mutexes.GetFirst(),
_T("deleting condition someone is still waiting on?") );
if ( pthread_mutex_destroy( &m_mutexSignalCount ) != 0 ||
pthread_mutex_destroy( &m_mutexListContents ) != 0 ||
pthread_mutex_destroy( &m_mutexSignal ) != 0 )
{ {
wxLogDebug(_T("Failed to destroy mutex (it is probably locked)")); wxLogDebug(_T("Failed to destroy mutex (it is probably locked)"));
} }
@@ -438,53 +415,30 @@ wxConditionInternal::~wxConditionInternal()
bool wxConditionInternal::Wait(const timespec* ts) bool wxConditionInternal::Wait(const timespec* ts)
{ {
MutexLock lock(m_mutex);
if ( m_nQueuedSignals )
{ {
m_nQueuedSignals--;
wxLogTrace(TRACE_THREADS, wxLogTrace(TRACE_THREADS,
_T("wxCondition(%08x)::Wait: about to lock missed signal counter"), _T("wxCondition(%08x)::Wait(): Has been signaled before"),
this); this);
MutexLock lock(m_mutexSignalCount); return TRUE;
if ( m_nSignalsMissed )
{
// the condition was signaled before we started to wait, just
// decrease the number of queued signals and return
m_nSignalsMissed--;
wxLogTrace(TRACE_THREADS,
_T("wxCondition(%08x)::Wait: not waiting at all, count = %u"),
this, m_nSignalsMissed);
return TRUE;
}
} }
// we need to really wait, create a new mutex for this // there are no queued signals, so start really waiting
pthread_mutex_t *mutex = new pthread_mutex_t; m_nWaiters++;
if ( pthread_mutex_init(mutex, (pthread_mutexattr_t *)NULL) != 0 )
{
// not supposed to happen
wxFAIL_MSG( _T("pthread_mutex_init() failed when starting waiting") );
}
// lock the mutex before starting to wait on it
pthread_mutex_lock(mutex);
// lock the list before modifying it
wxMutexList::Node *mutexNode;
{
MutexLock lockList(m_mutexListContents);
mutexNode = m_mutexes.Append(mutex);
}
// calling wait function below unlocks the mutex and Signal() or // calling wait function below unlocks the mutex and Signal() or
// Broadcast() will be able to continue to run now if they were // Broadcast() will be able to continue to run now if they were
// blocking for it in the loop locking all mutexes) // blocking for it in the loop locking all mutexes)
wxLogTrace(TRACE_THREADS, wxLogTrace(TRACE_THREADS,
_T("wxCondition(%08x)::Wait(): starting to wait"), this); _T("wxCondition(%08x)::Wait(): starting to wait"), this);
int err = ts ? pthread_cond_timedwait(&m_condition, mutex, ts)
: pthread_cond_wait(&m_condition, mutex); int err = ts ? pthread_cond_timedwait(&m_condition, &m_mutex, ts)
: pthread_cond_wait(&m_condition, &m_mutex);
switch ( err ) switch ( err )
{ {
case 0: case 0:
@@ -499,67 +453,31 @@ bool wxConditionInternal::Wait(const timespec* ts)
case ETIMEDOUT: case ETIMEDOUT:
case EINTR: case EINTR:
// The condition has not been signaled, so we have to
// decrement the counter manually
--m_nWaiters;
// wait interrupted or timeout elapsed // wait interrupted or timeout elapsed
wxLogTrace(TRACE_THREADS, wxLogTrace(TRACE_THREADS,
_T("wxCondition(%08x)::Wait(): timeout/intr"), this); _T("wxCondition(%08x)::Wait(): timeout/intr"), this);
} }
// delete the mutex we had used for waiting
{
MutexLock lock(m_mutexListContents);
pthread_mutex_t *m = mutexNode->GetData();
pthread_mutex_unlock(m);
pthread_mutex_destroy(m);
delete m;
m_mutexes.DeleteNode(mutexNode);
}
return err == 0; return err == 0;
} }
void wxConditionInternal::Signal(bool all) void wxConditionInternal::Signal(bool all)
{ {
// make sure that only one Signal() or Broadcast() is in progress // make sure that only one Signal() or Broadcast() is in progress
MutexLock lock(m_mutexSignal); MutexLock lock(m_mutex);
// this mutex has to be locked as well, so that during the entire Signal()
// call, no new Wait() is going to wreak havoc (it will block in the very
// beginning on this mutex instead)
MutexLock lockSignalCount(m_mutexSignalCount);
wxLogTrace(TRACE_THREADS,
_T("wxCondition(%08x)::Signal(): got signal count mutex"),
this);
// Are there any waiters?
if ( m_nWaiters == 0 )
{ {
MutexLock lockList(m_mutexListContents); // No, there are not, so don't signal but keep in mind for the next
// Wait()
m_nQueuedSignals++;
if ( !m_mutexes.GetFirst() ) return;
{
// nobody is waiting for us, just remember that the condition was
// signaled and don't do anything else for now
m_nSignalsMissed++;
wxLogTrace(TRACE_THREADS,
_T("wxCondition(%08x)::Signal(): no waiters, count = %u"),
this, m_nSignalsMissed);
return;
}
}
wxLogTrace(TRACE_THREADS,
_T("wxCondition(%08x)::Signal(): acquiring all mutexes"), this);
// All mutexes on the list have to be locked. This means that execution of
// Signal() goes on as soon as all pending Wait() calls have called
// pthread_cond_wait() (where the mutex gets unlocked internally)
wxMutexList::Node *node;
for ( node = m_mutexes.GetFirst(); node; node = node->GetNext() )
{
pthread_mutex_lock(node->GetData());
} }
// now we can finally signal it // now we can finally signal it
@@ -568,21 +486,21 @@ void wxConditionInternal::Signal(bool all)
int err = all ? pthread_cond_broadcast(&m_condition) int err = all ? pthread_cond_broadcast(&m_condition)
: pthread_cond_signal(&m_condition); : pthread_cond_signal(&m_condition);
if ( all )
{
m_nWaiters = 0;
}
else
{
--m_nWaiters;
}
if ( err ) if ( err )
{ {
// shouldn't ever happen // shouldn't ever happen
wxFAIL_MSG(_T("pthread_cond_{broadcast|signal}() failed")); wxFAIL_MSG(_T("pthread_cond_{broadcast|signal}() failed"));
} }
// unlock all mutexes so that the threads blocking in their Wait()s could
// continue running
for ( node = m_mutexes.GetFirst(); node; node = node->GetNext() )
{
pthread_mutex_unlock(node->GetData());
}
wxLogTrace(TRACE_THREADS,
_T("wxCondition(%08x)::Signal(): exiting"), this);
} }
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------