Remove the worker thread from wxWebSessionCURL
Instead of having wxWebSessionCURL run a worker thread that uses curl to monitor and process network activity, set up a separate socket poller class to monitor socket activity. The socket poller class will throw an event back to wxWebSessionCURL when it detects activity on the sockets so that it can tell curl to process the activity in the main thread.
This commit is contained in:
@@ -16,12 +16,14 @@
|
||||
|
||||
#include "wx/thread.h"
|
||||
#include "wx/vector.h"
|
||||
#include "wx/timer.h"
|
||||
|
||||
#include "curl/curl.h"
|
||||
|
||||
class wxWebRequestCURL;
|
||||
class wxWebResponseCURL;
|
||||
class wxWebSessionCURL;
|
||||
class SocketPoller;
|
||||
|
||||
class wxWebAuthChallengeCURL : public wxWebAuthChallengeImpl
|
||||
{
|
||||
@@ -123,7 +125,7 @@ private:
|
||||
wxDECLARE_NO_COPY_CLASS(wxWebResponseCURL);
|
||||
};
|
||||
|
||||
class wxWebSessionCURL : public wxWebSessionImpl, private wxThreadHelper
|
||||
class wxWebSessionCURL : public wxWebSessionImpl, public wxEvtHandler
|
||||
{
|
||||
public:
|
||||
wxWebSessionCURL();
|
||||
@@ -147,25 +149,21 @@ public:
|
||||
|
||||
void CancelRequest(wxWebRequestCURL* request);
|
||||
|
||||
protected:
|
||||
wxThread::ExitCode Entry() wxOVERRIDE;
|
||||
|
||||
private:
|
||||
static int TimerCallback(CURLM*, long, void*);
|
||||
static int SocketCallback(CURL*, curl_socket_t, int, void*, void*);
|
||||
|
||||
void ProcessTimerCallback(long);
|
||||
void TimeoutNotification(wxTimerEvent&);
|
||||
void ProcessTimeoutNotification();
|
||||
void ProcessSocketCallback(curl_socket_t, int);
|
||||
void ProcessSocketPollerResult(wxThreadEvent&);
|
||||
void CheckForCompletedTransfers();
|
||||
|
||||
SocketPoller* m_socketPoller;
|
||||
wxTimer m_timeoutTimer;
|
||||
CURLM* m_handle;
|
||||
|
||||
// Mutex and condition are used together to signal to the worker thread to
|
||||
// wake up and mutex is also used to protected m_shuttingDown field.
|
||||
wxMutex m_mutex;
|
||||
wxCondition m_condition;
|
||||
bool m_shuttingDown;
|
||||
|
||||
// MT-safe vector of requests for which Cancel() was called.
|
||||
struct CancelledData
|
||||
{
|
||||
wxCriticalSection cs;
|
||||
wxVector< wxObjectDataPtr<wxWebRequestCURL> > requests;
|
||||
} m_cancelled;
|
||||
|
||||
static int ms_activeSessions;
|
||||
|
||||
wxDECLARE_NO_COPY_CLASS(wxWebSessionCURL);
|
||||
|
@@ -23,6 +23,19 @@
|
||||
#endif
|
||||
|
||||
#include "wx/uri.h"
|
||||
#include "wx/socket.h"
|
||||
#include "wx/msgqueue.h"
|
||||
#include "wx/hashset.h"
|
||||
#include "wx/hashmap.h"
|
||||
|
||||
#ifdef __WXMSW__
|
||||
#include <wx/msw/wrapwin.h>
|
||||
#else
|
||||
#include <sys/socket.h>
|
||||
#include <sys/select.h>
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
// Define symbols that might be missing from older libcurl headers
|
||||
#ifndef CURL_AT_LEAST_VERSION
|
||||
@@ -361,6 +374,503 @@ void wxWebAuthChallengeCURL::SetCredentials(const wxWebCredentials& cred)
|
||||
m_request.StartRequest();
|
||||
}
|
||||
|
||||
|
||||
wxDECLARE_EVENT(wxSocketAction,wxThreadEvent);
|
||||
|
||||
class SocketPoller: public wxThreadHelper
|
||||
{
|
||||
public:
|
||||
enum PollAction
|
||||
{
|
||||
InvalidAction = 0,
|
||||
PollForRead = 1,
|
||||
PollForWrite = 2,
|
||||
PollForError = 4
|
||||
};
|
||||
|
||||
enum Result
|
||||
{
|
||||
InvalidResult= 0,
|
||||
ReadyForRead = 1,
|
||||
ReadyForWrite = 2,
|
||||
HasError = 4
|
||||
};
|
||||
|
||||
SocketPoller();
|
||||
~SocketPoller();
|
||||
bool StartPolling(wxSOCKET_T, int, wxEvtHandler*);
|
||||
void StopPolling(wxSOCKET_T);
|
||||
void ResumePolling(wxSOCKET_T);
|
||||
|
||||
private:
|
||||
enum
|
||||
{
|
||||
#ifdef INVALID_SOCKET
|
||||
SOCKET_POLLER_INVALID_SOCKET = INVALID_SOCKET
|
||||
#else
|
||||
SOCKET_POLLER_INVALID_SOCKET = ((wxSOCKET_T)(~0))
|
||||
#endif
|
||||
};
|
||||
|
||||
class Message
|
||||
{
|
||||
public:
|
||||
enum MessageId
|
||||
{
|
||||
AddSocketAction,
|
||||
DeleteSocketAction,
|
||||
ResumePolling,
|
||||
Quit,
|
||||
LastMessageId
|
||||
};
|
||||
|
||||
Message(MessageId i = LastMessageId,
|
||||
wxSOCKET_T s = SOCKET_POLLER_INVALID_SOCKET,
|
||||
int f = 0,
|
||||
wxEvtHandler* h = NULL)
|
||||
{
|
||||
m_messageId = i;
|
||||
m_socket = s;
|
||||
m_flags = f;
|
||||
m_evtHandler = h;
|
||||
}
|
||||
|
||||
MessageId GetMessageId() const {return m_messageId;}
|
||||
wxSOCKET_T GetSocket() const {return m_socket;}
|
||||
int GetFlags() const {return m_flags;}
|
||||
wxEvtHandler* GetEvtHandler() const {return m_evtHandler;}
|
||||
|
||||
private:
|
||||
wxSOCKET_T m_socket;
|
||||
int m_flags;
|
||||
MessageId m_messageId;
|
||||
wxEvtHandler* m_evtHandler;
|
||||
};
|
||||
|
||||
struct SocketData
|
||||
{
|
||||
int m_pollAction;
|
||||
void* m_event;
|
||||
wxEvtHandler* m_handler;
|
||||
};
|
||||
|
||||
WX_DECLARE_HASH_SET(wxSOCKET_T, wxIntegerHash, wxIntegerEqual, SocketSet );
|
||||
WX_DECLARE_HASH_MAP(wxSOCKET_T, SocketData, wxIntegerHash, wxIntegerEqual, \
|
||||
SocketDataMap );
|
||||
|
||||
// Items used from main thread.
|
||||
void CreateSocketPair();
|
||||
void CloseSocket(wxSOCKET_T);
|
||||
void PostAndSignal(const Message&);
|
||||
|
||||
SocketSet m_polledSockets;
|
||||
wxSOCKET_T m_writeEnd;
|
||||
|
||||
|
||||
// Items used in both threads.
|
||||
wxMessageQueue<Message> m_msgQueue;
|
||||
wxSOCKET_T m_readEnd;
|
||||
|
||||
|
||||
// Items used from worker thread.
|
||||
virtual wxThread::ExitCode Entry() wxOVERRIDE;
|
||||
|
||||
void ThreadRemoveFromDL(wxSOCKET_T);
|
||||
void ThreadSetSocketAction(wxSOCKET_T, int,wxEvtHandler*);
|
||||
void ThreadDeleteSocketAction(wxSOCKET_T);
|
||||
void ThreadCheckSockets();
|
||||
|
||||
SocketDataMap m_socketData;
|
||||
SocketSet m_disabledList;
|
||||
};
|
||||
|
||||
wxDEFINE_EVENT(wxSocketAction,wxThreadEvent);
|
||||
|
||||
|
||||
SocketPoller::SocketPoller()
|
||||
{
|
||||
m_writeEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET;
|
||||
m_readEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET;
|
||||
CreateSocketPair();
|
||||
|
||||
if (CreateThread(wxTHREAD_JOINABLE) != wxTHREAD_NO_ERROR)
|
||||
{
|
||||
wxLogDebug("Could not create socket poller worker thread!");
|
||||
return;
|
||||
}
|
||||
|
||||
if (GetThread()->Run() != wxTHREAD_NO_ERROR)
|
||||
{
|
||||
wxLogDebug("Could not run socket poller worker thread!");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
SocketPoller::~SocketPoller()
|
||||
{
|
||||
SocketPoller::Message msg(Message::Quit);
|
||||
PostAndSignal(msg);
|
||||
|
||||
GetThread()->Wait();
|
||||
|
||||
CloseSocket(m_writeEnd);
|
||||
CloseSocket(m_readEnd);
|
||||
|
||||
#ifdef __WXMSW__
|
||||
WSACleanup();
|
||||
#endif
|
||||
}
|
||||
|
||||
void SocketPoller::CreateSocketPair()
|
||||
{
|
||||
#ifdef __WXMSW__
|
||||
WORD wVersionRequested = MAKEWORD(1,1);
|
||||
WSADATA wsaData;
|
||||
WSAStartup(wVersionRequested, &wsaData);
|
||||
|
||||
m_writeEnd = socket(AF_INET, SOCK_DGRAM, 0);
|
||||
|
||||
if ( m_writeEnd == INVALID_SOCKET )
|
||||
{
|
||||
wxLogDebug("Unable to create write end of socket pair.");
|
||||
m_writeEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET;
|
||||
return;
|
||||
}
|
||||
|
||||
m_readEnd = socket(AF_INET, SOCK_DGRAM, 0);
|
||||
|
||||
if ( m_readEnd == INVALID_SOCKET )
|
||||
{
|
||||
wxLogDebug("Unable to create read end of socket pair.");
|
||||
CloseSocket(m_writeEnd);
|
||||
m_writeEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET;
|
||||
m_readEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET;
|
||||
return;
|
||||
}
|
||||
|
||||
// Bind the read end. This will assign it to an unused port.
|
||||
struct sockaddr_in serverAddr;
|
||||
serverAddr.sin_family = AF_INET;
|
||||
serverAddr.sin_port = htons(0);
|
||||
serverAddr.sin_addr.s_addr = inet_addr("127.0.0.1");
|
||||
|
||||
int retcode = bind(m_readEnd, (struct sockaddr *)&serverAddr,
|
||||
sizeof(serverAddr));
|
||||
if ( retcode < 0 )
|
||||
{
|
||||
wxLogDebug("Unable bind socket to port.");
|
||||
CloseSocket(m_writeEnd);
|
||||
m_writeEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET;
|
||||
CloseSocket(m_readEnd);
|
||||
m_readEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET;
|
||||
return;
|
||||
}
|
||||
|
||||
// Get the ip address and port of the read end.
|
||||
struct sockaddr_in readEndAddr;
|
||||
ZeroMemory(&readEndAddr, sizeof(readEndAddr));
|
||||
int len = sizeof(readEndAddr);
|
||||
getsockname(m_readEnd, (struct sockaddr *) &readEndAddr, &len);
|
||||
|
||||
// Unlike with stream sockets, this is a synchronous operation and just
|
||||
// sets location for the send function.
|
||||
connect(m_writeEnd, (struct sockaddr *)&readEndAddr, sizeof(readEndAddr));
|
||||
#else
|
||||
int fd[2];
|
||||
socketpair(AF_UNIX, SOCK_STREAM, 0, fd);
|
||||
|
||||
m_writeEnd = fd[0];
|
||||
m_readEnd = fd[1];
|
||||
#endif
|
||||
}
|
||||
|
||||
void SocketPoller::CloseSocket(wxSOCKET_T s)
|
||||
{
|
||||
#ifdef __WXMSW__
|
||||
closesocket(s);
|
||||
#else
|
||||
close(s);
|
||||
#endif
|
||||
}
|
||||
|
||||
bool SocketPoller::StartPolling(wxSOCKET_T s, int flags, wxEvtHandler* hndlr)
|
||||
{
|
||||
SocketSet::iterator it = m_polledSockets.find(s);
|
||||
|
||||
if ( m_polledSockets.size() > 1023 )
|
||||
{
|
||||
if ( it == m_polledSockets.end() )
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if ( it == m_polledSockets.end() )
|
||||
{
|
||||
m_polledSockets.insert(s);
|
||||
}
|
||||
|
||||
SocketPoller::Message msg(Message::AddSocketAction, s, flags, hndlr);
|
||||
PostAndSignal(msg);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void SocketPoller::StopPolling(wxSOCKET_T s)
|
||||
{
|
||||
SocketPoller::Message msg(Message::DeleteSocketAction, s);
|
||||
PostAndSignal(msg);
|
||||
|
||||
SocketSet::iterator it = m_polledSockets.find(s);
|
||||
|
||||
if ( it != m_polledSockets.end() )
|
||||
{
|
||||
m_polledSockets.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
void SocketPoller::ResumePolling(wxSOCKET_T s)
|
||||
{
|
||||
SocketPoller::Message msg(Message::ResumePolling, s);
|
||||
PostAndSignal(msg);
|
||||
}
|
||||
|
||||
void SocketPoller::PostAndSignal(const Message& msg)
|
||||
{
|
||||
m_msgQueue.Post(msg);
|
||||
|
||||
if ( m_writeEnd != SocketPoller::SOCKET_POLLER_INVALID_SOCKET )
|
||||
{
|
||||
char c = 32;
|
||||
send(m_writeEnd, &c, 1, 0);
|
||||
}
|
||||
}
|
||||
|
||||
wxThread::ExitCode SocketPoller::Entry()
|
||||
{
|
||||
wxMessageQueueError er;
|
||||
SocketPoller::Message msg;
|
||||
bool done = false;
|
||||
size_t socketsUnderConsideration = 0;
|
||||
int timeOut = 50;
|
||||
Message::MessageId id;
|
||||
|
||||
while ( !done )
|
||||
{
|
||||
if ( GetThread()->TestDestroy() )
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
// Process all messages in the message queue. If we currently have no
|
||||
// sockets, wait a little for a message to come in.
|
||||
timeOut = (socketsUnderConsideration == 0) ? 50 : 0;
|
||||
er = wxMSGQUEUE_NO_ERROR;
|
||||
|
||||
while ( er == wxMSGQUEUE_NO_ERROR )
|
||||
{
|
||||
er = m_msgQueue.ReceiveTimeout(timeOut, msg);
|
||||
id = msg.GetMessageId();
|
||||
|
||||
if ( er == wxMSGQUEUE_TIMEOUT )
|
||||
{
|
||||
break;
|
||||
}
|
||||
else if ( er == wxMSGQUEUE_MISC_ERROR )
|
||||
{
|
||||
wxLogDebug("Error with socket poller message queue.");
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
else if ( id == Message::Quit )
|
||||
{
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
else if ( id == Message::AddSocketAction )
|
||||
{
|
||||
wxSOCKET_T s = msg.GetSocket();
|
||||
int f = msg.GetFlags();
|
||||
wxEvtHandler* hndlr = msg.GetEvtHandler();
|
||||
ThreadSetSocketAction(s, f, hndlr);
|
||||
}
|
||||
else if ( id == Message::DeleteSocketAction )
|
||||
{
|
||||
wxSOCKET_T s = msg.GetSocket();
|
||||
ThreadDeleteSocketAction(s);
|
||||
}
|
||||
else if ( id == Message::ResumePolling )
|
||||
{
|
||||
wxSOCKET_T s = msg.GetSocket();
|
||||
ThreadRemoveFromDL(s);
|
||||
}
|
||||
|
||||
timeOut = 0;
|
||||
}
|
||||
|
||||
socketsUnderConsideration = m_socketData.size() - m_disabledList.size();
|
||||
|
||||
if ( socketsUnderConsideration > 0 && !done )
|
||||
{
|
||||
ThreadCheckSockets();
|
||||
}
|
||||
}
|
||||
|
||||
return static_cast<wxThread::ExitCode>(0);
|
||||
}
|
||||
|
||||
void SocketPoller::ThreadRemoveFromDL(wxSOCKET_T s)
|
||||
{
|
||||
SocketSet::iterator it = m_disabledList.find(s);
|
||||
|
||||
if ( it != m_disabledList.end() )
|
||||
{
|
||||
m_disabledList.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
void SocketPoller::ThreadDeleteSocketAction(wxSOCKET_T sock)
|
||||
{
|
||||
ThreadRemoveFromDL(sock);
|
||||
|
||||
SocketDataMap::iterator it = m_socketData.find(sock);
|
||||
|
||||
if ( it != m_socketData.end() )
|
||||
{
|
||||
m_socketData.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
void SocketPoller::ThreadSetSocketAction(wxSOCKET_T sock, int flags,
|
||||
wxEvtHandler* hndlr)
|
||||
{
|
||||
ThreadDeleteSocketAction(sock);
|
||||
|
||||
SocketData data;
|
||||
data.m_pollAction = flags;
|
||||
data.m_event = NULL;
|
||||
data.m_handler = hndlr;
|
||||
|
||||
m_socketData[sock] = data;
|
||||
}
|
||||
|
||||
void SocketPoller::ThreadCheckSockets()
|
||||
{
|
||||
fd_set readFds, writeFds, errorFds;
|
||||
FD_ZERO(&readFds);
|
||||
FD_ZERO(&writeFds);
|
||||
FD_ZERO(&errorFds);
|
||||
|
||||
wxSOCKET_T maxSd = 0;
|
||||
|
||||
if ( m_readEnd != SocketPoller::SOCKET_POLLER_INVALID_SOCKET )
|
||||
{
|
||||
FD_SET(m_readEnd, &readFds);
|
||||
maxSd = m_readEnd;
|
||||
}
|
||||
|
||||
for ( SocketDataMap::iterator it = m_socketData.begin() ;
|
||||
it != m_socketData.end() ; ++it )
|
||||
{
|
||||
wxSOCKET_T sock = it->first;
|
||||
|
||||
if ( m_disabledList.find(sock) != m_disabledList.end() )
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
int checkAction = it->second.m_pollAction;
|
||||
|
||||
if ( checkAction & PollForRead )
|
||||
{
|
||||
FD_SET(sock, &readFds);
|
||||
}
|
||||
|
||||
if ( checkAction & PollForWrite )
|
||||
{
|
||||
FD_SET(sock, &writeFds);
|
||||
}
|
||||
|
||||
FD_SET(sock, &errorFds);
|
||||
|
||||
if ( sock > maxSd )
|
||||
{
|
||||
maxSd = sock;
|
||||
}
|
||||
}
|
||||
|
||||
struct timeval timeout;
|
||||
timeout.tv_sec = 0; // 1s timeout
|
||||
timeout.tv_usec = 50*1000;
|
||||
|
||||
int selectStatus = select(maxSd+1, &readFds, &writeFds, &errorFds,&timeout);
|
||||
|
||||
if ( selectStatus < 0 )
|
||||
{
|
||||
// Massive error: do something
|
||||
}
|
||||
else if ( selectStatus == 0 )
|
||||
{
|
||||
;// select timed out. There is no need to do anything.
|
||||
}
|
||||
else
|
||||
{
|
||||
if ( (m_readEnd != SocketPoller::SOCKET_POLLER_INVALID_SOCKET)
|
||||
&& FD_ISSET(m_readEnd, &readFds) )
|
||||
{
|
||||
char c;
|
||||
recv(m_readEnd, &c, 1, 0);
|
||||
}
|
||||
|
||||
for ( SocketDataMap::iterator it = m_socketData.begin() ;
|
||||
it != m_socketData.end() ; ++it )
|
||||
{
|
||||
wxSOCKET_T sock = it->first;
|
||||
|
||||
if ( m_disabledList.find(sock) != m_disabledList.end() )
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
int checkActions = it->second.m_pollAction;
|
||||
wxEvtHandler* hndlr = it->second.m_handler;
|
||||
int result = InvalidResult;
|
||||
|
||||
if ( checkActions & PollForRead )
|
||||
{
|
||||
if ( FD_ISSET(sock, &readFds) )
|
||||
{
|
||||
result |= ReadyForRead;
|
||||
}
|
||||
}
|
||||
|
||||
if ( checkActions & ReadyForWrite )
|
||||
{
|
||||
if ( FD_ISSET(sock, &writeFds) )
|
||||
{
|
||||
result |= ReadyForWrite;
|
||||
}
|
||||
}
|
||||
|
||||
if ( FD_ISSET(sock, &errorFds) )
|
||||
{
|
||||
result |= HasError ;
|
||||
}
|
||||
|
||||
if ( result != InvalidResult && hndlr != NULL )
|
||||
{
|
||||
wxThreadEvent* event = new wxThreadEvent(wxSocketAction);
|
||||
event->SetPayload<wxSOCKET_T>(sock);
|
||||
event->SetInt(result);
|
||||
wxQueueEvent(hndlr,event);
|
||||
m_disabledList.insert(sock);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
//
|
||||
// wxWebSessionCURL
|
||||
//
|
||||
@@ -368,9 +878,7 @@ void wxWebAuthChallengeCURL::SetCredentials(const wxWebCredentials& cred)
|
||||
int wxWebSessionCURL::ms_activeSessions = 0;
|
||||
|
||||
wxWebSessionCURL::wxWebSessionCURL() :
|
||||
m_handle(NULL),
|
||||
m_condition(m_mutex),
|
||||
m_shuttingDown(false)
|
||||
m_handle(NULL)
|
||||
{
|
||||
// Initialize CURL globally if no sessions are active
|
||||
if ( ms_activeSessions == 0 )
|
||||
@@ -380,20 +888,16 @@ wxWebSessionCURL::wxWebSessionCURL() :
|
||||
}
|
||||
|
||||
ms_activeSessions++;
|
||||
|
||||
m_socketPoller = new SocketPoller();
|
||||
m_timeoutTimer.SetOwner(this);
|
||||
Bind(wxEVT_TIMER ,&wxWebSessionCURL::TimeoutNotification, this);
|
||||
Bind(wxSocketAction, &wxWebSessionCURL::ProcessSocketPollerResult, this);
|
||||
}
|
||||
|
||||
wxWebSessionCURL::~wxWebSessionCURL()
|
||||
{
|
||||
{
|
||||
// Notify the work thread
|
||||
wxMutexLocker lock(m_mutex);
|
||||
m_shuttingDown = true;
|
||||
m_condition.Signal();
|
||||
}
|
||||
|
||||
// Wait for work thread to finish
|
||||
if ( GetThread() && GetThread()->IsRunning() )
|
||||
GetThread()->Wait(wxTHREAD_WAIT_BLOCK);
|
||||
delete m_socketPoller;
|
||||
|
||||
if ( m_handle )
|
||||
curl_multi_cleanup(m_handle);
|
||||
@@ -419,147 +923,37 @@ wxWebSessionCURL::CreateRequest(wxWebSession& session,
|
||||
wxLogDebug("curl_multi_init() failed");
|
||||
return wxWebRequestImplPtr();
|
||||
}
|
||||
else
|
||||
{
|
||||
curl_multi_setopt(m_handle, CURLMOPT_SOCKETDATA, this);
|
||||
curl_multi_setopt(m_handle, CURLMOPT_SOCKETFUNCTION,SocketCallback);
|
||||
curl_multi_setopt(m_handle, CURLMOPT_TIMERDATA, this);
|
||||
curl_multi_setopt(m_handle, CURLMOPT_TIMERFUNCTION, TimerCallback);
|
||||
}
|
||||
}
|
||||
|
||||
return wxWebRequestImplPtr(new wxWebRequestCURL(session, *this, handler, url, id));
|
||||
}
|
||||
|
||||
static CURLMcode wx_curl_multi_wait(CURLM *multi_handle, int timeout_ms,
|
||||
int *ret)
|
||||
{
|
||||
// since libcurl 7.28.0 the curl_multi_wait method is more convient than
|
||||
// calling multiple curl_multi_... methods.
|
||||
// When support for older libcurl versions is dropped this wrapper can be
|
||||
// removed.
|
||||
#if wxCURL_HAVE_MULTI_WAIT
|
||||
return curl_multi_wait(multi_handle, NULL, 0, timeout_ms, ret);
|
||||
#else
|
||||
wxASSERT(ret != NULL);
|
||||
|
||||
fd_set fdread;
|
||||
fd_set fdwrite;
|
||||
fd_set fdexcep;
|
||||
timeval timeout;
|
||||
|
||||
long curl_timeo;
|
||||
|
||||
curl_multi_timeout(multi_handle, &curl_timeo);
|
||||
if ( curl_timeo < 0 )
|
||||
curl_timeo = timeout_ms;
|
||||
|
||||
timeout.tv_sec = curl_timeo / 1000;
|
||||
timeout.tv_usec = (curl_timeo % 1000) * 1000;
|
||||
|
||||
FD_ZERO(&fdread);
|
||||
FD_ZERO(&fdwrite);
|
||||
FD_ZERO(&fdexcep);
|
||||
|
||||
curl_multi_fdset(multi_handle, &fdread, &fdwrite, &fdexcep, ret);
|
||||
if ( *ret == -1 )
|
||||
return CURLM_OK;
|
||||
else if ( select(*ret + 1, &fdread, &fdwrite, &fdexcep, &timeout) == -1 )
|
||||
return CURLM_BAD_SOCKET;
|
||||
else
|
||||
return CURLM_OK;
|
||||
#endif
|
||||
}
|
||||
|
||||
wxThread::ExitCode wxWebSessionCURL::Entry()
|
||||
{
|
||||
// This mutex will be unlocked only while we're waiting on the condition.
|
||||
wxMutexLocker lock(m_mutex);
|
||||
|
||||
int activeRequests = -1;
|
||||
int repeats = 0;
|
||||
|
||||
while ( activeRequests )
|
||||
{
|
||||
// Handle cancelled requests
|
||||
{
|
||||
wxCriticalSectionLocker cancelledLock(m_cancelled.cs);
|
||||
while ( !m_cancelled.requests.empty() )
|
||||
{
|
||||
wxObjectDataPtr<wxWebRequestCURL> request(m_cancelled.requests.back());
|
||||
m_cancelled.requests.pop_back();
|
||||
curl_multi_remove_handle(m_handle, request->GetHandle());
|
||||
request->SetState(wxWebRequest::State_Cancelled);
|
||||
}
|
||||
}
|
||||
|
||||
// Instruct CURL to work on requests
|
||||
curl_multi_perform(m_handle, &activeRequests);
|
||||
|
||||
// Process CURL message queue
|
||||
int msgQueueCount;
|
||||
while ( CURLMsg* msg = curl_multi_info_read(m_handle, &msgQueueCount) )
|
||||
{
|
||||
if ( msg->msg == CURLMSG_DONE )
|
||||
{
|
||||
wxWebRequestCURL* request;
|
||||
curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &request);
|
||||
curl_multi_remove_handle(m_handle, msg->easy_handle);
|
||||
request->HandleCompletion();
|
||||
}
|
||||
}
|
||||
|
||||
if ( activeRequests )
|
||||
{
|
||||
// Wait for CURL work to finish
|
||||
int numfds;
|
||||
wx_curl_multi_wait(m_handle, 500, &numfds);
|
||||
|
||||
if ( !numfds )
|
||||
{
|
||||
repeats++; // count number of repeated zero numfds
|
||||
if ( repeats > 1 )
|
||||
wxMilliSleep(100);
|
||||
}
|
||||
else
|
||||
repeats = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Wait for new requests or shutdown of the session
|
||||
m_condition.Wait();
|
||||
if ( !m_shuttingDown )
|
||||
activeRequests = -1;
|
||||
}
|
||||
}
|
||||
|
||||
return (wxThread::ExitCode)0;
|
||||
}
|
||||
|
||||
bool wxWebSessionCURL::StartRequest(wxWebRequestCURL & request)
|
||||
{
|
||||
// Add request easy handle to multi handle
|
||||
curl_multi_add_handle(m_handle, request.GetHandle());
|
||||
|
||||
// Create and start session thread if not yet running
|
||||
if ( !GetThread() )
|
||||
{
|
||||
if ( CreateThread() )
|
||||
return false;
|
||||
|
||||
if ( GetThread()->Run() != wxTHREAD_NO_ERROR )
|
||||
return false;
|
||||
}
|
||||
CURL* curl = request.GetHandle();
|
||||
curl_multi_add_handle(m_handle, curl);
|
||||
|
||||
request.SetState(wxWebRequest::State_Active);
|
||||
|
||||
// Signal the worker thread to resume work
|
||||
wxMutexLocker lock(m_mutex);
|
||||
m_condition.Signal();
|
||||
// Report a timeout to curl to initiate this transfer.
|
||||
int runningHandles;
|
||||
curl_multi_socket_action(m_handle, CURL_SOCKET_TIMEOUT, 0, &runningHandles);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void wxWebSessionCURL::CancelRequest(wxWebRequestCURL* request)
|
||||
{
|
||||
// Add the request to a list of threads that will be removed from the curl
|
||||
// multi handle in the worker thread
|
||||
wxCriticalSectionLocker lock(m_cancelled.cs);
|
||||
request->IncRef();
|
||||
m_cancelled.requests.push_back(wxObjectDataPtr<wxWebRequestCURL>(request));
|
||||
curl_multi_remove_handle(m_handle, request->GetHandle());
|
||||
request->SetState(wxWebRequest::State_Cancelled);
|
||||
}
|
||||
|
||||
wxVersionInfo wxWebSessionCURL::GetLibraryVersionInfo()
|
||||
@@ -575,4 +969,164 @@ wxVersionInfo wxWebSessionCURL::GetLibraryVersionInfo()
|
||||
desc);
|
||||
}
|
||||
|
||||
// curl interacts with the wxWebSessionCURL class through 2 callback functions
|
||||
// 1) TimerCallback is called whenever curl wants us to start or stop a timer.
|
||||
// 2) SocketCallback is called when curl wants us to start monitoring a socket
|
||||
// for activity.
|
||||
//
|
||||
// curl accomplishes the network transfers by calling the
|
||||
// curl_multi_socket_action function to move pieces of the transfer to or from
|
||||
// the system's network services. Consequently we call this function when a
|
||||
// timeout occurs or when activity is detected on a socket.
|
||||
|
||||
int wxWebSessionCURL::TimerCallback(CURLM* WXUNUSED(multi), long timeoutms,
|
||||
void *userp)
|
||||
{
|
||||
wxWebSessionCURL* session = reinterpret_cast<wxWebSessionCURL*>(userp);
|
||||
session->ProcessTimerCallback(timeoutms);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int wxWebSessionCURL::SocketCallback(CURL* WXUNUSED(easy), curl_socket_t sock,
|
||||
int what, void* userp, void* WXUNUSED(sp))
|
||||
{
|
||||
wxWebSessionCURL* session = reinterpret_cast<wxWebSessionCURL*>(userp);
|
||||
session->ProcessSocketCallback(sock, what);
|
||||
return CURLM_OK;
|
||||
};
|
||||
|
||||
void wxWebSessionCURL::ProcessTimerCallback(long timeoutms)
|
||||
{
|
||||
// When this callback is called, curl wants us to start or stop a timer.
|
||||
// If timeoutms = -1, we should stop the timer. If timeoutms > 0, we should
|
||||
// start a oneshot timer and when that timer expires, we should call
|
||||
// curl_multi_socket_action(m_handle, CURL_SOCKET_TIMEOUT,...
|
||||
//
|
||||
// In the special case that timeoutms = 0, we should signal a timeout as
|
||||
// soon as possible (as above by calling curl_multi_socket_action). But
|
||||
// according to the curl documentation, we can't do that from this callback
|
||||
// or we might cause an infinite loop. So use CallAfter to report the
|
||||
// timeout at a slightly later time.
|
||||
|
||||
if ( timeoutms > 0)
|
||||
{
|
||||
m_timeoutTimer.StartOnce(timeoutms);
|
||||
}
|
||||
else if ( timeoutms < 0 )
|
||||
{
|
||||
m_timeoutTimer.Stop();
|
||||
}
|
||||
else // timeoutms == 0
|
||||
{
|
||||
CallAfter(&wxWebSessionCURL::ProcessTimeoutNotification);
|
||||
}
|
||||
}
|
||||
|
||||
void wxWebSessionCURL::TimeoutNotification(wxTimerEvent& WXUNUSED(event))
|
||||
{
|
||||
ProcessTimeoutNotification();
|
||||
}
|
||||
|
||||
void wxWebSessionCURL::ProcessTimeoutNotification()
|
||||
{
|
||||
int runningHandles;
|
||||
curl_multi_socket_action(m_handle, CURL_SOCKET_TIMEOUT, 0,&runningHandles);
|
||||
|
||||
CheckForCompletedTransfers();
|
||||
}
|
||||
|
||||
static int CurlPoll2SocketPoller(int what)
|
||||
{
|
||||
int pollAction = SocketPoller::InvalidAction;
|
||||
|
||||
if ( what == CURL_POLL_IN )
|
||||
{
|
||||
pollAction = SocketPoller::PollForRead ;
|
||||
}
|
||||
else if ( what == CURL_POLL_OUT )
|
||||
{
|
||||
pollAction = SocketPoller::PollForWrite;
|
||||
}
|
||||
else if ( what == CURL_POLL_INOUT )
|
||||
{
|
||||
pollAction = SocketPoller::PollForRead | SocketPoller::PollForWrite;
|
||||
}
|
||||
|
||||
return pollAction;
|
||||
}
|
||||
|
||||
void wxWebSessionCURL::ProcessSocketCallback(curl_socket_t s, int what)
|
||||
{
|
||||
// Have the socket poller start or stop monitoring a socket depending of
|
||||
// the value of what.
|
||||
|
||||
switch ( what )
|
||||
{
|
||||
case CURL_POLL_IN:
|
||||
wxFALLTHROUGH;
|
||||
case CURL_POLL_OUT:
|
||||
wxFALLTHROUGH;
|
||||
case CURL_POLL_INOUT:
|
||||
m_socketPoller->StartPolling(s, CurlPoll2SocketPoller(what), this);
|
||||
break;
|
||||
case CURL_POLL_REMOVE:
|
||||
m_socketPoller->StopPolling(s);
|
||||
break;
|
||||
default:
|
||||
wxLogDebug("Unknown socket action in ProcessSocketCallback");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static int SocketPollerResult2CurlSelect(int socketEventFlag)
|
||||
{
|
||||
int curlSelect = 0;
|
||||
|
||||
if ( socketEventFlag & SocketPoller::ReadyForRead )
|
||||
{
|
||||
curlSelect |= CURL_CSELECT_IN;
|
||||
}
|
||||
|
||||
if ( socketEventFlag & SocketPoller::ReadyForWrite )
|
||||
{
|
||||
curlSelect |= CURL_CSELECT_OUT;
|
||||
}
|
||||
|
||||
if ( socketEventFlag & SocketPoller::HasError )
|
||||
{
|
||||
curlSelect |= CURL_CSELECT_ERR;
|
||||
}
|
||||
|
||||
return curlSelect;
|
||||
}
|
||||
|
||||
void wxWebSessionCURL::ProcessSocketPollerResult(wxThreadEvent& event)
|
||||
{
|
||||
// Convert the socket poller result to an action flag needed by curl.
|
||||
// Then call curl_multi_socket_action.
|
||||
curl_socket_t sock = event.GetPayload<curl_socket_t>();
|
||||
int action = SocketPollerResult2CurlSelect(event.GetInt());
|
||||
int runningHandles;
|
||||
curl_multi_socket_action(m_handle, sock, action, &runningHandles);
|
||||
|
||||
CheckForCompletedTransfers();
|
||||
m_socketPoller->ResumePolling(sock);
|
||||
}
|
||||
|
||||
void wxWebSessionCURL::CheckForCompletedTransfers()
|
||||
{
|
||||
// Process CURL message queue
|
||||
int msgQueueCount;
|
||||
while ( CURLMsg* msg = curl_multi_info_read(m_handle, &msgQueueCount) )
|
||||
{
|
||||
if ( msg->msg == CURLMSG_DONE )
|
||||
{
|
||||
wxWebRequestCURL* request;
|
||||
curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &request);
|
||||
curl_multi_remove_handle(m_handle, msg->easy_handle);
|
||||
request->HandleCompletion();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif // wxUSE_WEBREQUEST_CURL
|
||||
|
Reference in New Issue
Block a user