diff --git a/include/wx/private/webrequest_curl.h b/include/wx/private/webrequest_curl.h index ec38041785..b0aa565584 100644 --- a/include/wx/private/webrequest_curl.h +++ b/include/wx/private/webrequest_curl.h @@ -16,12 +16,14 @@ #include "wx/thread.h" #include "wx/vector.h" +#include "wx/timer.h" #include "curl/curl.h" class wxWebRequestCURL; class wxWebResponseCURL; class wxWebSessionCURL; +class SocketPoller; class wxWebAuthChallengeCURL : public wxWebAuthChallengeImpl { @@ -123,7 +125,7 @@ private: wxDECLARE_NO_COPY_CLASS(wxWebResponseCURL); }; -class wxWebSessionCURL : public wxWebSessionImpl, private wxThreadHelper +class wxWebSessionCURL : public wxWebSessionImpl, public wxEvtHandler { public: wxWebSessionCURL(); @@ -147,25 +149,21 @@ public: void CancelRequest(wxWebRequestCURL* request); -protected: - wxThread::ExitCode Entry() wxOVERRIDE; - private: + static int TimerCallback(CURLM*, long, void*); + static int SocketCallback(CURL*, curl_socket_t, int, void*, void*); + + void ProcessTimerCallback(long); + void TimeoutNotification(wxTimerEvent&); + void ProcessTimeoutNotification(); + void ProcessSocketCallback(curl_socket_t, int); + void ProcessSocketPollerResult(wxThreadEvent&); + void CheckForCompletedTransfers(); + + SocketPoller* m_socketPoller; + wxTimer m_timeoutTimer; CURLM* m_handle; - // Mutex and condition are used together to signal to the worker thread to - // wake up and mutex is also used to protected m_shuttingDown field. - wxMutex m_mutex; - wxCondition m_condition; - bool m_shuttingDown; - - // MT-safe vector of requests for which Cancel() was called. - struct CancelledData - { - wxCriticalSection cs; - wxVector< wxObjectDataPtr > requests; - } m_cancelled; - static int ms_activeSessions; wxDECLARE_NO_COPY_CLASS(wxWebSessionCURL); diff --git a/src/common/webrequest_curl.cpp b/src/common/webrequest_curl.cpp index e6e47e567c..d82c41e43f 100644 --- a/src/common/webrequest_curl.cpp +++ b/src/common/webrequest_curl.cpp @@ -23,6 +23,19 @@ #endif #include "wx/uri.h" +#include "wx/socket.h" +#include "wx/msgqueue.h" +#include "wx/hashset.h" +#include "wx/hashmap.h" + +#ifdef __WXMSW__ + #include +#else + #include + #include +#endif + + // Define symbols that might be missing from older libcurl headers #ifndef CURL_AT_LEAST_VERSION @@ -361,6 +374,503 @@ void wxWebAuthChallengeCURL::SetCredentials(const wxWebCredentials& cred) m_request.StartRequest(); } + +wxDECLARE_EVENT(wxSocketAction,wxThreadEvent); + +class SocketPoller: public wxThreadHelper +{ +public: + enum PollAction + { + InvalidAction = 0, + PollForRead = 1, + PollForWrite = 2, + PollForError = 4 + }; + + enum Result + { + InvalidResult= 0, + ReadyForRead = 1, + ReadyForWrite = 2, + HasError = 4 + }; + + SocketPoller(); + ~SocketPoller(); + bool StartPolling(wxSOCKET_T, int, wxEvtHandler*); + void StopPolling(wxSOCKET_T); + void ResumePolling(wxSOCKET_T); + +private: + enum + { +#ifdef INVALID_SOCKET + SOCKET_POLLER_INVALID_SOCKET = INVALID_SOCKET +#else + SOCKET_POLLER_INVALID_SOCKET = ((wxSOCKET_T)(~0)) +#endif + }; + + class Message + { + public: + enum MessageId + { + AddSocketAction, + DeleteSocketAction, + ResumePolling, + Quit, + LastMessageId + }; + + Message(MessageId i = LastMessageId, + wxSOCKET_T s = SOCKET_POLLER_INVALID_SOCKET, + int f = 0, + wxEvtHandler* h = NULL) + { + m_messageId = i; + m_socket = s; + m_flags = f; + m_evtHandler = h; + } + + MessageId GetMessageId() const {return m_messageId;} + wxSOCKET_T GetSocket() const {return m_socket;} + int GetFlags() const {return m_flags;} + wxEvtHandler* GetEvtHandler() const {return m_evtHandler;} + + private: + wxSOCKET_T m_socket; + int m_flags; + MessageId m_messageId; + wxEvtHandler* m_evtHandler; + }; + + struct SocketData + { + int m_pollAction; + void* m_event; + wxEvtHandler* m_handler; + }; + + WX_DECLARE_HASH_SET(wxSOCKET_T, wxIntegerHash, wxIntegerEqual, SocketSet ); + WX_DECLARE_HASH_MAP(wxSOCKET_T, SocketData, wxIntegerHash, wxIntegerEqual, \ + SocketDataMap ); + + // Items used from main thread. + void CreateSocketPair(); + void CloseSocket(wxSOCKET_T); + void PostAndSignal(const Message&); + + SocketSet m_polledSockets; + wxSOCKET_T m_writeEnd; + + + // Items used in both threads. + wxMessageQueue m_msgQueue; + wxSOCKET_T m_readEnd; + + + // Items used from worker thread. + virtual wxThread::ExitCode Entry() wxOVERRIDE; + + void ThreadRemoveFromDL(wxSOCKET_T); + void ThreadSetSocketAction(wxSOCKET_T, int,wxEvtHandler*); + void ThreadDeleteSocketAction(wxSOCKET_T); + void ThreadCheckSockets(); + + SocketDataMap m_socketData; + SocketSet m_disabledList; +}; + +wxDEFINE_EVENT(wxSocketAction,wxThreadEvent); + + +SocketPoller::SocketPoller() +{ + m_writeEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; + m_readEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; + CreateSocketPair(); + + if (CreateThread(wxTHREAD_JOINABLE) != wxTHREAD_NO_ERROR) + { + wxLogDebug("Could not create socket poller worker thread!"); + return; + } + + if (GetThread()->Run() != wxTHREAD_NO_ERROR) + { + wxLogDebug("Could not run socket poller worker thread!"); + return; + } +} + +SocketPoller::~SocketPoller() +{ + SocketPoller::Message msg(Message::Quit); + PostAndSignal(msg); + + GetThread()->Wait(); + + CloseSocket(m_writeEnd); + CloseSocket(m_readEnd); + +#ifdef __WXMSW__ + WSACleanup(); +#endif +} + +void SocketPoller::CreateSocketPair() +{ +#ifdef __WXMSW__ + WORD wVersionRequested = MAKEWORD(1,1); + WSADATA wsaData; + WSAStartup(wVersionRequested, &wsaData); + + m_writeEnd = socket(AF_INET, SOCK_DGRAM, 0); + + if ( m_writeEnd == INVALID_SOCKET ) + { + wxLogDebug("Unable to create write end of socket pair."); + m_writeEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; + return; + } + + m_readEnd = socket(AF_INET, SOCK_DGRAM, 0); + + if ( m_readEnd == INVALID_SOCKET ) + { + wxLogDebug("Unable to create read end of socket pair."); + CloseSocket(m_writeEnd); + m_writeEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; + m_readEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; + return; + } + + // Bind the read end. This will assign it to an unused port. + struct sockaddr_in serverAddr; + serverAddr.sin_family = AF_INET; + serverAddr.sin_port = htons(0); + serverAddr.sin_addr.s_addr = inet_addr("127.0.0.1"); + + int retcode = bind(m_readEnd, (struct sockaddr *)&serverAddr, + sizeof(serverAddr)); + if ( retcode < 0 ) + { + wxLogDebug("Unable bind socket to port."); + CloseSocket(m_writeEnd); + m_writeEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; + CloseSocket(m_readEnd); + m_readEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; + return; + } + + // Get the ip address and port of the read end. + struct sockaddr_in readEndAddr; + ZeroMemory(&readEndAddr, sizeof(readEndAddr)); + int len = sizeof(readEndAddr); + getsockname(m_readEnd, (struct sockaddr *) &readEndAddr, &len); + + // Unlike with stream sockets, this is a synchronous operation and just + // sets location for the send function. + connect(m_writeEnd, (struct sockaddr *)&readEndAddr, sizeof(readEndAddr)); +#else + int fd[2]; + socketpair(AF_UNIX, SOCK_STREAM, 0, fd); + + m_writeEnd = fd[0]; + m_readEnd = fd[1]; +#endif +} + +void SocketPoller::CloseSocket(wxSOCKET_T s) +{ +#ifdef __WXMSW__ + closesocket(s); +#else + close(s); +#endif +} + +bool SocketPoller::StartPolling(wxSOCKET_T s, int flags, wxEvtHandler* hndlr) +{ + SocketSet::iterator it = m_polledSockets.find(s); + + if ( m_polledSockets.size() > 1023 ) + { + if ( it == m_polledSockets.end() ) + { + return false; + } + } + + if ( it == m_polledSockets.end() ) + { + m_polledSockets.insert(s); + } + + SocketPoller::Message msg(Message::AddSocketAction, s, flags, hndlr); + PostAndSignal(msg); + + return true; +} + +void SocketPoller::StopPolling(wxSOCKET_T s) +{ + SocketPoller::Message msg(Message::DeleteSocketAction, s); + PostAndSignal(msg); + + SocketSet::iterator it = m_polledSockets.find(s); + + if ( it != m_polledSockets.end() ) + { + m_polledSockets.erase(it); + } +} + +void SocketPoller::ResumePolling(wxSOCKET_T s) +{ + SocketPoller::Message msg(Message::ResumePolling, s); + PostAndSignal(msg); +} + +void SocketPoller::PostAndSignal(const Message& msg) +{ + m_msgQueue.Post(msg); + + if ( m_writeEnd != SocketPoller::SOCKET_POLLER_INVALID_SOCKET ) + { + char c = 32; + send(m_writeEnd, &c, 1, 0); + } +} + +wxThread::ExitCode SocketPoller::Entry() +{ + wxMessageQueueError er; + SocketPoller::Message msg; + bool done = false; + size_t socketsUnderConsideration = 0; + int timeOut = 50; + Message::MessageId id; + + while ( !done ) + { + if ( GetThread()->TestDestroy() ) + { + break; + } + + // Process all messages in the message queue. If we currently have no + // sockets, wait a little for a message to come in. + timeOut = (socketsUnderConsideration == 0) ? 50 : 0; + er = wxMSGQUEUE_NO_ERROR; + + while ( er == wxMSGQUEUE_NO_ERROR ) + { + er = m_msgQueue.ReceiveTimeout(timeOut, msg); + id = msg.GetMessageId(); + + if ( er == wxMSGQUEUE_TIMEOUT ) + { + break; + } + else if ( er == wxMSGQUEUE_MISC_ERROR ) + { + wxLogDebug("Error with socket poller message queue."); + done = true; + break; + } + else if ( id == Message::Quit ) + { + done = true; + break; + } + else if ( id == Message::AddSocketAction ) + { + wxSOCKET_T s = msg.GetSocket(); + int f = msg.GetFlags(); + wxEvtHandler* hndlr = msg.GetEvtHandler(); + ThreadSetSocketAction(s, f, hndlr); + } + else if ( id == Message::DeleteSocketAction ) + { + wxSOCKET_T s = msg.GetSocket(); + ThreadDeleteSocketAction(s); + } + else if ( id == Message::ResumePolling ) + { + wxSOCKET_T s = msg.GetSocket(); + ThreadRemoveFromDL(s); + } + + timeOut = 0; + } + + socketsUnderConsideration = m_socketData.size() - m_disabledList.size(); + + if ( socketsUnderConsideration > 0 && !done ) + { + ThreadCheckSockets(); + } + } + + return static_cast(0); +} + +void SocketPoller::ThreadRemoveFromDL(wxSOCKET_T s) +{ + SocketSet::iterator it = m_disabledList.find(s); + + if ( it != m_disabledList.end() ) + { + m_disabledList.erase(it); + } +} + +void SocketPoller::ThreadDeleteSocketAction(wxSOCKET_T sock) +{ + ThreadRemoveFromDL(sock); + + SocketDataMap::iterator it = m_socketData.find(sock); + + if ( it != m_socketData.end() ) + { + m_socketData.erase(it); + } +} + +void SocketPoller::ThreadSetSocketAction(wxSOCKET_T sock, int flags, + wxEvtHandler* hndlr) +{ + ThreadDeleteSocketAction(sock); + + SocketData data; + data.m_pollAction = flags; + data.m_event = NULL; + data.m_handler = hndlr; + + m_socketData[sock] = data; +} + +void SocketPoller::ThreadCheckSockets() +{ + fd_set readFds, writeFds, errorFds; + FD_ZERO(&readFds); + FD_ZERO(&writeFds); + FD_ZERO(&errorFds); + + wxSOCKET_T maxSd = 0; + + if ( m_readEnd != SocketPoller::SOCKET_POLLER_INVALID_SOCKET ) + { + FD_SET(m_readEnd, &readFds); + maxSd = m_readEnd; + } + + for ( SocketDataMap::iterator it = m_socketData.begin() ; + it != m_socketData.end() ; ++it ) + { + wxSOCKET_T sock = it->first; + + if ( m_disabledList.find(sock) != m_disabledList.end() ) + { + continue; + } + + int checkAction = it->second.m_pollAction; + + if ( checkAction & PollForRead ) + { + FD_SET(sock, &readFds); + } + + if ( checkAction & PollForWrite ) + { + FD_SET(sock, &writeFds); + } + + FD_SET(sock, &errorFds); + + if ( sock > maxSd ) + { + maxSd = sock; + } + } + + struct timeval timeout; + timeout.tv_sec = 0; // 1s timeout + timeout.tv_usec = 50*1000; + + int selectStatus = select(maxSd+1, &readFds, &writeFds, &errorFds,&timeout); + + if ( selectStatus < 0 ) + { + // Massive error: do something + } + else if ( selectStatus == 0 ) + { + ;// select timed out. There is no need to do anything. + } + else + { + if ( (m_readEnd != SocketPoller::SOCKET_POLLER_INVALID_SOCKET) + && FD_ISSET(m_readEnd, &readFds) ) + { + char c; + recv(m_readEnd, &c, 1, 0); + } + + for ( SocketDataMap::iterator it = m_socketData.begin() ; + it != m_socketData.end() ; ++it ) + { + wxSOCKET_T sock = it->first; + + if ( m_disabledList.find(sock) != m_disabledList.end() ) + { + continue; + } + + int checkActions = it->second.m_pollAction; + wxEvtHandler* hndlr = it->second.m_handler; + int result = InvalidResult; + + if ( checkActions & PollForRead ) + { + if ( FD_ISSET(sock, &readFds) ) + { + result |= ReadyForRead; + } + } + + if ( checkActions & ReadyForWrite ) + { + if ( FD_ISSET(sock, &writeFds) ) + { + result |= ReadyForWrite; + } + } + + if ( FD_ISSET(sock, &errorFds) ) + { + result |= HasError ; + } + + if ( result != InvalidResult && hndlr != NULL ) + { + wxThreadEvent* event = new wxThreadEvent(wxSocketAction); + event->SetPayload(sock); + event->SetInt(result); + wxQueueEvent(hndlr,event); + m_disabledList.insert(sock); + } + } + } +} + + + // // wxWebSessionCURL // @@ -368,9 +878,7 @@ void wxWebAuthChallengeCURL::SetCredentials(const wxWebCredentials& cred) int wxWebSessionCURL::ms_activeSessions = 0; wxWebSessionCURL::wxWebSessionCURL() : - m_handle(NULL), - m_condition(m_mutex), - m_shuttingDown(false) + m_handle(NULL) { // Initialize CURL globally if no sessions are active if ( ms_activeSessions == 0 ) @@ -380,20 +888,16 @@ wxWebSessionCURL::wxWebSessionCURL() : } ms_activeSessions++; + + m_socketPoller = new SocketPoller(); + m_timeoutTimer.SetOwner(this); + Bind(wxEVT_TIMER ,&wxWebSessionCURL::TimeoutNotification, this); + Bind(wxSocketAction, &wxWebSessionCURL::ProcessSocketPollerResult, this); } wxWebSessionCURL::~wxWebSessionCURL() { - { - // Notify the work thread - wxMutexLocker lock(m_mutex); - m_shuttingDown = true; - m_condition.Signal(); - } - - // Wait for work thread to finish - if ( GetThread() && GetThread()->IsRunning() ) - GetThread()->Wait(wxTHREAD_WAIT_BLOCK); + delete m_socketPoller; if ( m_handle ) curl_multi_cleanup(m_handle); @@ -419,147 +923,37 @@ wxWebSessionCURL::CreateRequest(wxWebSession& session, wxLogDebug("curl_multi_init() failed"); return wxWebRequestImplPtr(); } + else + { + curl_multi_setopt(m_handle, CURLMOPT_SOCKETDATA, this); + curl_multi_setopt(m_handle, CURLMOPT_SOCKETFUNCTION,SocketCallback); + curl_multi_setopt(m_handle, CURLMOPT_TIMERDATA, this); + curl_multi_setopt(m_handle, CURLMOPT_TIMERFUNCTION, TimerCallback); + } } return wxWebRequestImplPtr(new wxWebRequestCURL(session, *this, handler, url, id)); } -static CURLMcode wx_curl_multi_wait(CURLM *multi_handle, int timeout_ms, - int *ret) -{ - // since libcurl 7.28.0 the curl_multi_wait method is more convient than - // calling multiple curl_multi_... methods. - // When support for older libcurl versions is dropped this wrapper can be - // removed. -#if wxCURL_HAVE_MULTI_WAIT - return curl_multi_wait(multi_handle, NULL, 0, timeout_ms, ret); -#else - wxASSERT(ret != NULL); - - fd_set fdread; - fd_set fdwrite; - fd_set fdexcep; - timeval timeout; - - long curl_timeo; - - curl_multi_timeout(multi_handle, &curl_timeo); - if ( curl_timeo < 0 ) - curl_timeo = timeout_ms; - - timeout.tv_sec = curl_timeo / 1000; - timeout.tv_usec = (curl_timeo % 1000) * 1000; - - FD_ZERO(&fdread); - FD_ZERO(&fdwrite); - FD_ZERO(&fdexcep); - - curl_multi_fdset(multi_handle, &fdread, &fdwrite, &fdexcep, ret); - if ( *ret == -1 ) - return CURLM_OK; - else if ( select(*ret + 1, &fdread, &fdwrite, &fdexcep, &timeout) == -1 ) - return CURLM_BAD_SOCKET; - else - return CURLM_OK; -#endif -} - -wxThread::ExitCode wxWebSessionCURL::Entry() -{ - // This mutex will be unlocked only while we're waiting on the condition. - wxMutexLocker lock(m_mutex); - - int activeRequests = -1; - int repeats = 0; - - while ( activeRequests ) - { - // Handle cancelled requests - { - wxCriticalSectionLocker cancelledLock(m_cancelled.cs); - while ( !m_cancelled.requests.empty() ) - { - wxObjectDataPtr request(m_cancelled.requests.back()); - m_cancelled.requests.pop_back(); - curl_multi_remove_handle(m_handle, request->GetHandle()); - request->SetState(wxWebRequest::State_Cancelled); - } - } - - // Instruct CURL to work on requests - curl_multi_perform(m_handle, &activeRequests); - - // Process CURL message queue - int msgQueueCount; - while ( CURLMsg* msg = curl_multi_info_read(m_handle, &msgQueueCount) ) - { - if ( msg->msg == CURLMSG_DONE ) - { - wxWebRequestCURL* request; - curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &request); - curl_multi_remove_handle(m_handle, msg->easy_handle); - request->HandleCompletion(); - } - } - - if ( activeRequests ) - { - // Wait for CURL work to finish - int numfds; - wx_curl_multi_wait(m_handle, 500, &numfds); - - if ( !numfds ) - { - repeats++; // count number of repeated zero numfds - if ( repeats > 1 ) - wxMilliSleep(100); - } - else - repeats = 0; - } - else - { - // Wait for new requests or shutdown of the session - m_condition.Wait(); - if ( !m_shuttingDown ) - activeRequests = -1; - } - } - - return (wxThread::ExitCode)0; -} - bool wxWebSessionCURL::StartRequest(wxWebRequestCURL & request) { // Add request easy handle to multi handle - curl_multi_add_handle(m_handle, request.GetHandle()); - - // Create and start session thread if not yet running - if ( !GetThread() ) - { - if ( CreateThread() ) - return false; - - if ( GetThread()->Run() != wxTHREAD_NO_ERROR ) - return false; - } + CURL* curl = request.GetHandle(); + curl_multi_add_handle(m_handle, curl); request.SetState(wxWebRequest::State_Active); - // Signal the worker thread to resume work - wxMutexLocker lock(m_mutex); - m_condition.Signal(); + // Report a timeout to curl to initiate this transfer. + int runningHandles; + curl_multi_socket_action(m_handle, CURL_SOCKET_TIMEOUT, 0, &runningHandles); return true; } void wxWebSessionCURL::CancelRequest(wxWebRequestCURL* request) { - // Add the request to a list of threads that will be removed from the curl - // multi handle in the worker thread - wxCriticalSectionLocker lock(m_cancelled.cs); - request->IncRef(); - m_cancelled.requests.push_back(wxObjectDataPtr(request)); + curl_multi_remove_handle(m_handle, request->GetHandle()); + request->SetState(wxWebRequest::State_Cancelled); } wxVersionInfo wxWebSessionCURL::GetLibraryVersionInfo() @@ -575,4 +969,164 @@ wxVersionInfo wxWebSessionCURL::GetLibraryVersionInfo() desc); } +// curl interacts with the wxWebSessionCURL class through 2 callback functions +// 1) TimerCallback is called whenever curl wants us to start or stop a timer. +// 2) SocketCallback is called when curl wants us to start monitoring a socket +// for activity. +// +// curl accomplishes the network transfers by calling the +// curl_multi_socket_action function to move pieces of the transfer to or from +// the system's network services. Consequently we call this function when a +// timeout occurs or when activity is detected on a socket. + +int wxWebSessionCURL::TimerCallback(CURLM* WXUNUSED(multi), long timeoutms, + void *userp) +{ + wxWebSessionCURL* session = reinterpret_cast(userp); + session->ProcessTimerCallback(timeoutms); + return 0; +} + +int wxWebSessionCURL::SocketCallback(CURL* WXUNUSED(easy), curl_socket_t sock, + int what, void* userp, void* WXUNUSED(sp)) +{ + wxWebSessionCURL* session = reinterpret_cast(userp); + session->ProcessSocketCallback(sock, what); + return CURLM_OK; +}; + +void wxWebSessionCURL::ProcessTimerCallback(long timeoutms) +{ + // When this callback is called, curl wants us to start or stop a timer. + // If timeoutms = -1, we should stop the timer. If timeoutms > 0, we should + // start a oneshot timer and when that timer expires, we should call + // curl_multi_socket_action(m_handle, CURL_SOCKET_TIMEOUT,... + // + // In the special case that timeoutms = 0, we should signal a timeout as + // soon as possible (as above by calling curl_multi_socket_action). But + // according to the curl documentation, we can't do that from this callback + // or we might cause an infinite loop. So use CallAfter to report the + // timeout at a slightly later time. + + if ( timeoutms > 0) + { + m_timeoutTimer.StartOnce(timeoutms); + } + else if ( timeoutms < 0 ) + { + m_timeoutTimer.Stop(); + } + else // timeoutms == 0 + { + CallAfter(&wxWebSessionCURL::ProcessTimeoutNotification); + } +} + +void wxWebSessionCURL::TimeoutNotification(wxTimerEvent& WXUNUSED(event)) +{ + ProcessTimeoutNotification(); +} + +void wxWebSessionCURL::ProcessTimeoutNotification() +{ + int runningHandles; + curl_multi_socket_action(m_handle, CURL_SOCKET_TIMEOUT, 0,&runningHandles); + + CheckForCompletedTransfers(); +} + +static int CurlPoll2SocketPoller(int what) +{ + int pollAction = SocketPoller::InvalidAction; + + if ( what == CURL_POLL_IN ) + { + pollAction = SocketPoller::PollForRead ; + } + else if ( what == CURL_POLL_OUT ) + { + pollAction = SocketPoller::PollForWrite; + } + else if ( what == CURL_POLL_INOUT ) + { + pollAction = SocketPoller::PollForRead | SocketPoller::PollForWrite; + } + + return pollAction; +} + +void wxWebSessionCURL::ProcessSocketCallback(curl_socket_t s, int what) +{ + // Have the socket poller start or stop monitoring a socket depending of + // the value of what. + + switch ( what ) + { + case CURL_POLL_IN: + wxFALLTHROUGH; + case CURL_POLL_OUT: + wxFALLTHROUGH; + case CURL_POLL_INOUT: + m_socketPoller->StartPolling(s, CurlPoll2SocketPoller(what), this); + break; + case CURL_POLL_REMOVE: + m_socketPoller->StopPolling(s); + break; + default: + wxLogDebug("Unknown socket action in ProcessSocketCallback"); + break; + } +} + +static int SocketPollerResult2CurlSelect(int socketEventFlag) +{ + int curlSelect = 0; + + if ( socketEventFlag & SocketPoller::ReadyForRead ) + { + curlSelect |= CURL_CSELECT_IN; + } + + if ( socketEventFlag & SocketPoller::ReadyForWrite ) + { + curlSelect |= CURL_CSELECT_OUT; + } + + if ( socketEventFlag & SocketPoller::HasError ) + { + curlSelect |= CURL_CSELECT_ERR; + } + + return curlSelect; +} + +void wxWebSessionCURL::ProcessSocketPollerResult(wxThreadEvent& event) +{ + // Convert the socket poller result to an action flag needed by curl. + // Then call curl_multi_socket_action. + curl_socket_t sock = event.GetPayload(); + int action = SocketPollerResult2CurlSelect(event.GetInt()); + int runningHandles; + curl_multi_socket_action(m_handle, sock, action, &runningHandles); + + CheckForCompletedTransfers(); + m_socketPoller->ResumePolling(sock); +} + +void wxWebSessionCURL::CheckForCompletedTransfers() +{ + // Process CURL message queue + int msgQueueCount; + while ( CURLMsg* msg = curl_multi_info_read(m_handle, &msgQueueCount) ) + { + if ( msg->msg == CURLMSG_DONE ) + { + wxWebRequestCURL* request; + curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &request); + curl_multi_remove_handle(m_handle, msg->easy_handle); + request->HandleCompletion(); + } + } +} + #endif // wxUSE_WEBREQUEST_CURL