added wxSemaphore (with docs), new version of wxCondition and bug fixes to wxThread (patch 538242 by K.S. Sreeram)

git-svn-id: https://svn.wxwidgets.org/svn/wx/wxWidgets/trunk@14907 c3d73ce0-8a6f-49c7-b76d-6d57e0e08775
This commit is contained in:
Vadim Zeitlin
2002-04-02 13:15:16 +00:00
parent 8e2baee50d
commit be80986891
7 changed files with 850 additions and 353 deletions

View File

@@ -2,13 +2,14 @@
// Name: threadpsx.cpp
// Purpose: wxThread (Posix) Implementation
// Author: Original from Wolfram Gloger/Guilhem Lavaux
// Modified by:
// Modified by: K. S. Sreeram (2002): POSIXified wxCondition, added wxSemaphore
// Created: 04/22/98
// RCS-ID: $Id$
// Copyright: (c) Wolfram Gloger (1996, 1997)
// Guilhem Lavaux (1998)
// Vadim Zeitlin (1999-2002)
// Robert Roebling (1999)
// K. S. Sreeram (2002)
// Licence: wxWindows licence
/////////////////////////////////////////////////////////////////////////////
@@ -34,6 +35,7 @@
#include "wx/log.h"
#include "wx/intl.h"
#include "wx/dynarray.h"
#include "wx/timer.h"
#include <stdio.h>
#include <unistd.h>
@@ -72,15 +74,6 @@ static const wxThread::ExitCode EXITCODE_CANCELLED = (wxThread::ExitCode)-1;
// our trace mask
#define TRACE_THREADS _T("thread")
// ----------------------------------------------------------------------------
// pseudo template types
// ----------------------------------------------------------------------------
WX_DECLARE_LIST(pthread_mutex_t, wxMutexList);
#include "wx/listimpl.cpp"
WX_DEFINE_LIST(wxMutexList);
// ----------------------------------------------------------------------------
// private functions
// ----------------------------------------------------------------------------
@@ -88,35 +81,6 @@ WX_DEFINE_LIST(wxMutexList);
static void ScheduleThreadForDeletion();
static void DeleteThread(wxThread *This);
// ----------------------------------------------------------------------------
// private classes
// ----------------------------------------------------------------------------
// same as wxMutexLocker but for "native" mutex
class MutexLock
{
public:
MutexLock(pthread_mutex_t& mutex)
{
m_mutex = &mutex;
if ( pthread_mutex_lock(m_mutex) != 0 )
{
wxLogDebug(_T("pthread_mutex_lock() failed"));
}
}
~MutexLock()
{
if ( pthread_mutex_unlock(m_mutex) != 0 )
{
wxLogDebug(_T("pthread_mutex_unlock() failed"));
}
}
private:
pthread_mutex_t *m_mutex;
};
// ----------------------------------------------------------------------------
// types
// ----------------------------------------------------------------------------
@@ -143,7 +107,7 @@ static pthread_key_t gs_keySelf;
static size_t gs_nThreadsBeingDeleted = 0;
// a mutex to protect gs_nThreadsBeingDeleted
static pthread_mutex_t gs_mutexDeleteThread;
static wxMutex *gs_mutexDeleteThread = (wxMutex *)NULL;
// and a condition variable which will be signaled when all
// gs_nThreadsBeingDeleted will have been deleted
@@ -174,6 +138,8 @@ public:
private:
pthread_mutex_t m_mutex;
friend class wxConditionInternal;
};
wxMutexInternal::wxMutexInternal()
@@ -342,174 +308,107 @@ wxMutexError wxMutex::Unlock()
return m_internal->Unlock();
}
// ============================================================================
// ===========================================================================
// wxCondition implementation
// ============================================================================
// ===========================================================================
// ----------------------------------------------------------------------------
// ---------------------------------------------------------------------------
// wxConditionInternal
// ----------------------------------------------------------------------------
// ---------------------------------------------------------------------------
// The native POSIX condition variables are dumb: if the condition is signaled
// before another thread starts to wait on it, the signal is lost and so this
// other thread will be never woken up. It's much more convenient to us to
// remember that the condition was signaled and to return from Wait()
// immediately in this case (this is more like Win32 automatic event objects)
class wxConditionInternal
{
public:
wxConditionInternal();
wxConditionInternal( wxMutex *mutex );
~wxConditionInternal();
// wait with the given timeout or indefinitely if NULL
bool Wait(const timespec* ts = NULL);
void Wait();
void Signal(bool all = FALSE);
bool Wait( const timespec *ts );
void Signal();
void Broadcast();
private:
// the number of Signal() calls we "missed", i.e. which were done while
// there were no threads to wait for them
size_t m_nQueuedSignals;
// counts all pending waiters
size_t m_nWaiters;
// the condition itself
pthread_cond_t m_condition;
// the mutex used with the conditon: it also protects the counters above
pthread_mutex_t m_mutex;
wxMutex *m_mutex;
pthread_cond_t m_cond;
};
wxConditionInternal::wxConditionInternal()
wxConditionInternal::wxConditionInternal( wxMutex *mutex )
{
m_nQueuedSignals =
m_nWaiters = 0;
if ( pthread_cond_init(&m_condition, (pthread_condattr_t *)NULL) != 0 )
m_mutex = mutex;
if ( pthread_cond_init( &m_cond, NULL ) != 0 )
{
// this is supposed to never happen
wxFAIL_MSG( _T("pthread_cond_init() failed") );
}
if ( pthread_mutex_init(&m_mutex, NULL) != 0 )
{
// neither this
wxFAIL_MSG( _T("wxCondition: pthread_mutex_init() failed") );
wxLogDebug(_T("pthread_cond_init() failed"));
}
}
wxConditionInternal::~wxConditionInternal()
{
if ( pthread_cond_destroy( &m_condition ) != 0 )
if ( pthread_cond_destroy( &m_cond ) != 0 )
{
wxLogDebug(_T("Failed to destroy condition variable (some "
"threads are probably still waiting on it?)"));
}
if ( pthread_mutex_destroy( &m_mutex ) != 0 )
{
wxLogDebug(_T("Failed to destroy mutex (it is probably locked)"));
wxLogDebug(_T("pthread_cond_destroy() failed"));
}
}
bool wxConditionInternal::Wait(const timespec* ts)
void wxConditionInternal::Wait()
{
MutexLock lock(m_mutex);
if ( m_nQueuedSignals )
if ( pthread_cond_wait( &m_cond, &(m_mutex->m_internal->m_mutex) ) != 0 )
{
m_nQueuedSignals--;
wxLogTrace(TRACE_THREADS,
_T("wxCondition(%08x)::Wait(): Has been signaled before"),
this);
return TRUE;
wxLogDebug(_T("pthread_cond_wait() failed"));
}
// there are no queued signals, so start really waiting
m_nWaiters++;
// calling wait function below unlocks the mutex and Signal() or
// Broadcast() will be able to continue to run now if they were
// blocking for it in the loop locking all mutexes)
wxLogTrace(TRACE_THREADS,
_T("wxCondition(%08x)::Wait(): starting to wait"), this);
int err = ts ? pthread_cond_timedwait(&m_condition, &m_mutex, ts)
: pthread_cond_wait(&m_condition, &m_mutex);
switch ( err )
{
case 0:
// condition was signaled
wxLogTrace(TRACE_THREADS,
_T("wxCondition(%08x)::Wait(): ok"), this);
break;
default:
wxLogDebug(_T("unexpected pthread_cond_[timed]wait() return"));
// fall through
case ETIMEDOUT:
case EINTR:
// The condition has not been signaled, so we have to
// decrement the counter manually
--m_nWaiters;
// wait interrupted or timeout elapsed
wxLogTrace(TRACE_THREADS,
_T("wxCondition(%08x)::Wait(): timeout/intr"), this);
}
return err == 0;
}
void wxConditionInternal::Signal(bool all)
bool wxConditionInternal::Wait( const timespec *ts )
{
// make sure that only one Signal() or Broadcast() is in progress
MutexLock lock(m_mutex);
int result = pthread_cond_timedwait( &m_cond,
&(m_mutex->m_internal->m_mutex),
ts );
if ( result == ETIMEDOUT )
return FALSE;
// Are there any waiters?
if ( m_nWaiters == 0 )
{
// No, there are not, so don't signal but keep in mind for the next
// Wait()
m_nQueuedSignals++;
wxASSERT_MSG( result == 0, _T("pthread_cond_timedwait() failed") );
return;
return TRUE;
}
void wxConditionInternal::Signal()
{
int result = pthread_cond_signal( &m_cond );
if ( result != 0 )
{
wxFAIL_MSG( _T("pthread_cond_signal() failed") );
}
}
// now we can finally signal it
wxLogTrace(TRACE_THREADS, _T("wxCondition(%08x)::Signal(): preparing to %s"),
this, all ? _T("broadcast") : _T("signal"));
void wxConditionInternal::Broadcast()
{
int result = pthread_cond_broadcast( &m_cond );
if ( result != 0 )
{
wxFAIL_MSG( _T("pthread_cond_broadcast() failed") );
}
}
int err = all ? pthread_cond_broadcast(&m_condition)
: pthread_cond_signal(&m_condition);
if ( all )
// ---------------------------------------------------------------------------
// wxCondition
// ---------------------------------------------------------------------------
wxCondition::wxCondition( wxMutex *mutex )
{
if ( !mutex )
{
m_nWaiters = 0;
wxFAIL_MSG( _T("NULL mutex in wxCondition ctor") );
m_internal = NULL;
}
else
{
--m_nWaiters;
m_internal = new wxConditionInternal( mutex );
}
if ( err )
{
// shouldn't ever happen
wxFAIL_MSG(_T("pthread_cond_{broadcast|signal}() failed"));
}
}
// ----------------------------------------------------------------------------
// wxCondition
// ----------------------------------------------------------------------------
wxCondition::wxCondition()
{
m_internal = new wxConditionInternal;
}
wxCondition::~wxCondition()
@@ -519,32 +418,269 @@ wxCondition::~wxCondition()
void wxCondition::Wait()
{
(void)m_internal->Wait();
if ( m_internal )
m_internal->Wait();
}
bool wxCondition::Wait(unsigned long sec, unsigned long nsec)
bool wxCondition::Wait( unsigned long timeout_millis )
{
wxCHECK_MSG( m_internal, FALSE, _T("can't wait on uninitalized condition") );
wxLongLong curtime = wxGetLocalTimeMillis();
curtime += timeout_millis;
wxLongLong temp = curtime / 1000;
int sec = temp.GetLo();
temp = temp * 1000;
temp = curtime - temp;
int millis = temp.GetLo();
timespec tspec;
tspec.tv_sec = time(0L) + sec; // FIXME is time(0) correct here?
tspec.tv_nsec = nsec;
tspec.tv_sec = sec;
tspec.tv_nsec = millis * 1000L * 1000L;
return m_internal->Wait(&tspec);
}
void wxCondition::Signal()
{
m_internal->Signal();
if ( m_internal )
m_internal->Signal();
}
void wxCondition::Broadcast()
{
m_internal->Signal(TRUE /* all */);
if ( m_internal )
m_internal->Broadcast();
}
// ============================================================================
// ===========================================================================
// wxSemaphore implementation
// ===========================================================================
// ---------------------------------------------------------------------------
// wxSemaphoreInternal
// ---------------------------------------------------------------------------
class wxSemaphoreInternal
{
public:
wxSemaphoreInternal( int initialcount, int maxcount );
void Wait();
bool TryWait();
bool Wait( unsigned long timeout_millis );
void Post();
private:
wxMutex m_mutex;
wxCondition m_cond;
int count,
maxcount;
};
wxSemaphoreInternal::wxSemaphoreInternal( int initialcount, int maxcount )
: m_cond(m_mutex)
{
if ( (initialcount < 0) || ((maxcount > 0) && (initialcount > maxcount)) )
{
wxFAIL_MSG( _T("wxSemaphore: invalid initial count") );
}
maxcount = maxcount;
count = initialcount;
}
void wxSemaphoreInternal::Wait()
{
wxMutexLocker locker(*m_mutex);
while ( count <= 0 )
{
m_cond->Wait();
}
count--;
}
bool wxSemaphoreInternal::TryWait()
{
wxMutexLocker locker(*m_mutex);
if ( count <= 0 )
return FALSE;
count--;
return TRUE;
}
bool wxSemaphoreInternal::Wait( unsigned long timeout_millis )
{
wxMutexLocker locker( *m_mutex );
wxLongLong startTime = wxGetLocalTimeMillis();
while ( count <= 0 )
{
wxLongLong elapsed = wxGetLocalTimeMillis() - startTime;
long remainingTime = (long)timeout_millis - (long)elapsed.GetLo();
if ( remainingTime <= 0 )
return FALSE;
bool result = m_cond->Wait( remainingTime );
if ( !result )
return FALSE;
}
count--;
return TRUE;
}
void wxSemaphoreInternal::Post()
{
wxMutexLocker locker(*m_mutex);
if ( (maxcount > 0) && (count == maxcount) )
{
wxFAIL_MSG( _T("wxSemaphore::Post() overflow") );
}
count++;
m_cond->Signal();
}
// --------------------------------------------------------------------------
// wxSemaphore
// --------------------------------------------------------------------------
wxSemaphore::wxSemaphore( int initialcount, int maxcount )
{
m_internal = new wxSemaphoreInternal( initialcount, maxcount );
}
wxSemaphore::~wxSemaphore()
{
delete m_internal;
}
void wxSemaphore::Wait()
{
m_internal->Wait();
}
bool wxSemaphore::TryWait()
{
return m_internal->TryWait();
}
bool wxSemaphore::Wait( unsigned long timeout_millis )
{
return m_internal->Wait( timeout_millis );
}
void wxSemaphore::Post()
{
m_internal->Post();
}
// This class is used by wxThreadInternal to support Delete() on
// a detached thread
class wxRefCountedCondition
{
public:
// start with a initial reference count of 1
wxRefCountedCondition()
{
m_refCount = 1;
m_signaled = FALSE;
m_mutex = new wxMutex();
m_cond = new wxCondition( m_mutex );
}
// increment the reference count
void AddRef()
{
wxMutexLocker locker( *m_mutex );
m_refCount++;
}
// decrement the reference count if reference count is zero then delete the
// object
void DeleteRef()
{
bool shouldDelete = FALSE;
m_mutex->Lock();
if ( --m_refCount == 0 )
{
shouldDelete = TRUE;
}
m_mutex->Unlock();
if ( shouldDelete )
{
delete this;
}
}
// sets the object to signaled this signal will be a persistent signal all
// further Wait()s on the object will return without blocking
void SetSignaled()
{
wxMutexLocker locker( *m_mutex );
m_signaled = TRUE;
m_cond->Broadcast();
}
// wait till the object is signaled if the object was already signaled then
// return immediately
void Wait()
{
wxMutexLocker locker( *m_mutex );
if ( !m_signaled )
{
m_cond->Wait();
}
}
private:
int m_refCount;
wxMutex *m_mutex;
wxCondition *m_cond;
bool m_signaled;
// Cannot delete this object directly, call DeleteRef() instead
~wxRefCountedCondition()
{
delete m_cond;
delete m_mutex;
}
// suppress gcc warning about the class having private dtor and not having
// friend (so what??)
friend class wxDummyFriend;
};
// ===========================================================================
// wxThread implementation
// ============================================================================
// ===========================================================================
// the thread callback functions must have the C linkage
extern "C"
@@ -580,7 +716,7 @@ public:
// wake up threads waiting for our termination
void SignalExit();
// wake up threads waiting for our start
void SignalRun() { m_condRun.Signal(); }
void SignalRun() { m_semRun.Post(); }
// go to sleep until Resume() is called
void Pause();
// resume the thread
@@ -630,7 +766,7 @@ private:
// this flag is set when the thread should terminate
bool m_cancelled;
// this flag is set when the thread is blocking on m_condSuspend
// this flag is set when the thread is blocking on m_semSuspend
bool m_isPaused;
// the thread exit code - only used for joinable (!detached) threads and
@@ -644,22 +780,18 @@ private:
bool m_shouldBroadcast;
bool m_isDetached;
// VZ: it's possible that we might do with less than three different
// condition objects - for example, m_condRun and m_condEnd a priori
// won't be used in the same time. But for now I prefer this may be a
// bit less efficient but safer solution of having distinct condition
// variables for each purpose.
// this condition is signaled by Run() and the threads Entry() is not
// this semaphore is posted by Run() and the threads Entry() is not
// called before it is done
wxCondition m_condRun;
wxSemaphore m_semRun;
// this one is signaled when the thread should resume after having been
// Pause()d
wxCondition m_condSuspend;
wxSemaphore m_semSuspend;
// finally this one is signalled when the thread exits
wxCondition m_condEnd;
// we are using a reference counted condition to support
// Delete() for a detached thread
wxRefCountedCondition *m_condEnd;
};
// ----------------------------------------------------------------------------
@@ -697,8 +829,8 @@ void *wxThreadInternal::PthreadStart(wxThread *thread)
pthread_cleanup_push(wxPthreadCleanup, thread);
#endif // HAVE_THREAD_CLEANUP_FUNCTIONS
// wait for the condition to be signaled from Run()
pthread->m_condRun.Wait();
// wait for the semaphore to be posted from Run()
pthread->m_semRun.Wait();
// test whether we should run the run at all - may be it was deleted
// before it started to Run()?
@@ -792,17 +924,20 @@ wxThreadInternal::wxThreadInternal()
m_threadId = 0;
m_exitcode = 0;
// set to TRUE only when the thread starts waiting on m_condSuspend
// set to TRUE only when the thread starts waiting on m_semSuspend
m_isPaused = FALSE;
// defaults for joinable threads
m_shouldBeJoined = TRUE;
m_shouldBroadcast = TRUE;
m_isDetached = FALSE;
m_condEnd = new wxRefCountedCondition();
}
wxThreadInternal::~wxThreadInternal()
{
m_condEnd->DeleteRef();
}
wxThreadError wxThreadInternal::Run()
@@ -832,7 +967,15 @@ void wxThreadInternal::Wait()
// wait until the thread terminates (we're blocking in _another_ thread,
// of course)
m_condEnd.Wait();
// a reference counting condition is used to handle the
// case where a detached thread deletes itself
// before m_condEnd->Wait() returns
// in this case the deletion of the condition object is deferred until
// all Wait()ing threads have finished calling DeleteRef()
m_condEnd->AddRef();
m_condEnd->Wait();
m_condEnd->DeleteRef();
wxLogTrace(TRACE_THREADS, _T("Finished waiting for thread %ld."), id);
@@ -877,7 +1020,7 @@ void wxThreadInternal::SignalExit()
wxLogTrace(TRACE_THREADS, _T("Thread %ld signals end condition."),
GetId());
m_condEnd.Broadcast();
m_condEnd->SetSignaled();
}
}
@@ -890,8 +1033,8 @@ void wxThreadInternal::Pause()
wxLogTrace(TRACE_THREADS, _T("Thread %ld goes to sleep."), GetId());
// wait until the condition is signaled from Resume()
m_condSuspend.Wait();
// wait until the semaphore is Post()ed from Resume()
m_semSuspend.Wait();
}
void wxThreadInternal::Resume()
@@ -906,7 +1049,7 @@ void wxThreadInternal::Resume()
wxLogTrace(TRACE_THREADS, _T("Waking up thread %ld"), GetId());
// wake up Pause()
m_condSuspend.Signal();
m_semSuspend.Post();
// reset the flag
SetReallyPaused(FALSE);
@@ -1307,7 +1450,7 @@ wxThreadError wxThread::Delete(ExitCode *rc)
{
case STATE_NEW:
// we need to wake up the thread so that PthreadStart() will
// terminate - right now it's blocking on m_condRun
// terminate - right now it's blocking on m_semRun
m_internal->SignalRun();
// fall through
@@ -1490,7 +1633,8 @@ wxThread::~wxThread()
// is not called for the joinable threads, so do it here
if ( !m_isDetached )
{
MutexLock lock(gs_mutexDeleteThread);
wxMutexLocker lock( *gs_mutexDeleteThread );
gs_nThreadsBeingDeleted--;
wxLogTrace(TRACE_THREADS, _T("%u scheduled for deletion threads left."),
@@ -1566,9 +1710,8 @@ bool wxThreadModule::OnInit()
gs_mutexGui->Lock();
#endif // wxUSE_GUI
// under Solaris we get a warning from CC when using
// PTHREAD_MUTEX_INITIALIZER, so do it dynamically
pthread_mutex_init(&gs_mutexDeleteThread, NULL);
gs_mutexDeleteThread = new wxMutex();
gs_condAllDeleted = new wxCondition( gs_mutexDeleteThread );
return TRUE;
}
@@ -1579,18 +1722,19 @@ void wxThreadModule::OnExit()
// are there any threads left which are being deleted right now?
size_t nThreadsBeingDeleted;
{
MutexLock lock(gs_mutexDeleteThread);
wxMutexLocker lock( *gs_mutexDeleteThread );
nThreadsBeingDeleted = gs_nThreadsBeingDeleted;
}
if ( nThreadsBeingDeleted > 0 )
{
wxLogTrace(TRACE_THREADS, _T("Waiting for %u threads to disappear"),
nThreadsBeingDeleted);
if ( nThreadsBeingDeleted > 0 )
{
wxLogTrace(TRACE_THREADS, _T("Waiting for %u threads to disappear"),
nThreadsBeingDeleted);
// have to wait until all of them disappear
gs_condAllDeleted->Wait();
// have to wait until all of them disappear
gs_condAllDeleted->Wait();
}
}
// terminate any threads left
@@ -1617,6 +1761,9 @@ void wxThreadModule::OnExit()
// and free TLD slot
(void)pthread_key_delete(gs_keySelf);
delete gs_condAllDeleted;
delete gs_mutexDeleteThread;
}
// ----------------------------------------------------------------------------
@@ -1625,12 +1772,7 @@ void wxThreadModule::OnExit()
static void ScheduleThreadForDeletion()
{
MutexLock lock(gs_mutexDeleteThread);
if ( gs_nThreadsBeingDeleted == 0 )
{
gs_condAllDeleted = new wxCondition;
}
wxMutexLocker lock( *gs_mutexDeleteThread );
gs_nThreadsBeingDeleted++;
@@ -1643,17 +1785,15 @@ static void DeleteThread(wxThread *This)
{
// gs_mutexDeleteThread should be unlocked before signalling the condition
// or wxThreadModule::OnExit() would deadlock
{
MutexLock lock(gs_mutexDeleteThread);
wxMutexLocker locker( *gs_mutexDeleteThread );
wxLogTrace(TRACE_THREADS, _T("Thread %ld auto deletes."), This->GetId());
wxLogTrace(TRACE_THREADS, _T("Thread %ld auto deletes."), This->GetId());
delete This;
delete This;
wxCHECK_RET( gs_nThreadsBeingDeleted > 0,
wxCHECK_RET( gs_nThreadsBeingDeleted > 0,
_T("no threads scheduled for deletion, yet we delete "
"one?") );
}
wxLogTrace(TRACE_THREADS, _T("%u scheduled for deletion threads left."),
gs_nThreadsBeingDeleted - 1);
@@ -1662,9 +1802,6 @@ static void DeleteThread(wxThread *This)
{
// no more threads left, signal it
gs_condAllDeleted->Signal();
delete gs_condAllDeleted;
gs_condAllDeleted = (wxCondition *)NULL;
}
}