diff --git a/include/wx/private/webrequest.h b/include/wx/private/webrequest.h index 0812304fac..497bbfc170 100644 --- a/include/wx/private/webrequest.h +++ b/include/wx/private/webrequest.h @@ -187,6 +187,10 @@ protected: void ReportDataReceived(size_t sizeReceived); + // This function can optionally be called to preallocate the read buffer, + // if the total amount of data to be downloaded is known in advance. + void PreAllocBuffer(size_t sizeNeeded); + private: // Called by wxWebRequestImpl only. friend class wxWebRequestImpl; diff --git a/include/wx/private/webrequest_curl.h b/include/wx/private/webrequest_curl.h index ec38041785..81b7aecdd7 100644 --- a/include/wx/private/webrequest_curl.h +++ b/include/wx/private/webrequest_curl.h @@ -16,12 +16,15 @@ #include "wx/thread.h" #include "wx/vector.h" +#include "wx/timer.h" +#include "wx/hashmap.h" #include "curl/curl.h" class wxWebRequestCURL; class wxWebResponseCURL; class wxWebSessionCURL; +class SocketPoller; class wxWebAuthChallengeCURL : public wxWebAuthChallengeImpl { @@ -112,10 +115,12 @@ public: // Methods called from libcurl callbacks size_t CURLOnWrite(void *buffer, size_t size); size_t CURLOnHeader(const char* buffer, size_t size); + int CURLOnProgress(curl_off_t); private: wxWebRequestHeaderMap m_headers; wxString m_statusText; + wxFileOffset m_knownDownloadSize; CURL* GetHandle() const { return static_cast(m_request).GetHandle(); } @@ -123,7 +128,7 @@ private: wxDECLARE_NO_COPY_CLASS(wxWebResponseCURL); }; -class wxWebSessionCURL : public wxWebSessionImpl, private wxThreadHelper +class wxWebSessionCURL : public wxWebSessionImpl, public wxEvtHandler { public: wxWebSessionCURL(); @@ -147,26 +152,40 @@ public: void CancelRequest(wxWebRequestCURL* request); -protected: - wxThread::ExitCode Entry() wxOVERRIDE; + void RequestHasTerminated(wxWebRequestCURL* request); + + static bool CurlRuntimeAtLeastVersion(unsigned int, unsigned int, + unsigned int); private: + static int TimerCallback(CURLM*, long, void*); + static int SocketCallback(CURL*, curl_socket_t, int, void*, void*); + + void ProcessTimerCallback(long); + void TimeoutNotification(wxTimerEvent&); + void ProcessTimeoutNotification(); + void ProcessSocketCallback(CURL*, curl_socket_t, int); + void ProcessSocketPollerResult(wxThreadEvent&); + void CheckForCompletedTransfers(); + void FailRequest(CURL*, const wxString&); + void StopActiveTransfer(CURL*); + void RemoveActiveSocket(CURL*); + + WX_DECLARE_HASH_MAP(CURL*, wxWebRequestCURL*, wxPointerHash, \ + wxPointerEqual, TransferSet); + + WX_DECLARE_HASH_MAP(CURL*, curl_socket_t, wxPointerHash, \ + wxPointerEqual, CurlSocketMap); + + TransferSet m_activeTransfers; + CurlSocketMap m_activeSockets; + + SocketPoller* m_socketPoller; + wxTimer m_timeoutTimer; CURLM* m_handle; - // Mutex and condition are used together to signal to the worker thread to - // wake up and mutex is also used to protected m_shuttingDown field. - wxMutex m_mutex; - wxCondition m_condition; - bool m_shuttingDown; - - // MT-safe vector of requests for which Cancel() was called. - struct CancelledData - { - wxCriticalSection cs; - wxVector< wxObjectDataPtr > requests; - } m_cancelled; - static int ms_activeSessions; + static unsigned int ms_runtimeVersion; wxDECLARE_NO_COPY_CLASS(wxWebSessionCURL); }; diff --git a/src/common/webrequest.cpp b/src/common/webrequest.cpp index beaf255223..20bf309dc2 100644 --- a/src/common/webrequest.cpp +++ b/src/common/webrequest.cpp @@ -690,6 +690,11 @@ void* wxWebResponseImpl::GetDataBuffer(size_t sizeNeeded) return m_readBuffer.GetAppendBuf(sizeNeeded); } +void wxWebResponseImpl::PreAllocBuffer(size_t sizeNeeded) +{ + m_readBuffer.SetBufSize(sizeNeeded); +} + void wxWebResponseImpl::ReportDataReceived(size_t sizeReceived) { m_readBuffer.UngetAppendBuf(sizeReceived); diff --git a/src/common/webrequest_curl.cpp b/src/common/webrequest_curl.cpp index e6e47e567c..17dc62b13d 100644 --- a/src/common/webrequest_curl.cpp +++ b/src/common/webrequest_curl.cpp @@ -23,6 +23,17 @@ #endif #include "wx/uri.h" +#include "wx/private/socket.h" +#include "wx/evtloop.h" + +#ifdef __WINDOWS__ + #include "wx/hashset.h" + #include "wx/msw/wrapwin.h" +#else + #include "wx/evtloopsrc.h" + #include "wx/evtloop.h" +#endif + // Define symbols that might be missing from older libcurl headers #ifndef CURL_AT_LEAST_VERSION @@ -31,12 +42,6 @@ (LIBCURL_VERSION_NUM >= CURL_VERSION_BITS(x, y, z)) #endif -#if CURL_AT_LEAST_VERSION(7, 28, 0) - #define wxCURL_HAVE_MULTI_WAIT 1 -#else - #define wxCURL_HAVE_MULTI_WAIT 0 -#endif - // The new name was introduced in curl 7.21.6. #ifndef CURLOPT_ACCEPT_ENCODING #define CURLOPT_ACCEPT_ENCODING CURLOPT_ENCODING @@ -60,12 +65,55 @@ static size_t wxCURLHeader(char *buffer, size_t size, size_t nitems, void *userd return static_cast(userdata)->CURLOnHeader(buffer, size * nitems); } +static int wxCURLXferInfo(void* clientp, curl_off_t dltotal, + curl_off_t WXUNUSED(dlnow), + curl_off_t WXUNUSED(ultotal), + curl_off_t WXUNUSED(ulnow)) +{ + wxCHECK_MSG( clientp, 0, "invalid curl progress callback data" ); + + wxWebResponseCURL* response = reinterpret_cast(clientp); + return response->CURLOnProgress(dltotal); +} + +static int wxCURLProgress(void* clientp, double dltotal, double dlnow, + double ultotal, double ulnow) +{ + return wxCURLXferInfo(clientp, static_cast(dltotal), + static_cast(dlnow), + static_cast(ultotal), + static_cast(ulnow)); +} + wxWebResponseCURL::wxWebResponseCURL(wxWebRequestCURL& request) : wxWebResponseImpl(request) { + m_knownDownloadSize = 0; + curl_easy_setopt(GetHandle(), CURLOPT_WRITEDATA, static_cast(this)); curl_easy_setopt(GetHandle(), CURLOPT_HEADERDATA, static_cast(this)); + // Set the progress callback. + #if CURL_AT_LEAST_VERSION(7, 32, 0) + if ( wxWebSessionCURL::CurlRuntimeAtLeastVersion(7, 32, 0) ) + { + curl_easy_setopt(GetHandle(), CURLOPT_XFERINFOFUNCTION, + wxCURLXferInfo); + curl_easy_setopt(GetHandle(), CURLOPT_XFERINFODATA, + static_cast(this)); + } + else + #endif + { + curl_easy_setopt(GetHandle(), CURLOPT_PROGRESSFUNCTION, + wxCURLProgress); + curl_easy_setopt(GetHandle(), CURLOPT_PROGRESSDATA, + static_cast(this)); + } + + // Have curl call the progress callback. + curl_easy_setopt(GetHandle(), CURLOPT_NOPROGRESS, 0L); + Init(); } @@ -104,6 +152,20 @@ size_t wxWebResponseCURL::CURLOnHeader(const char * buffer, size_t size) return size; } +int wxWebResponseCURL::CURLOnProgress(curl_off_t total) +{ + if ( m_knownDownloadSize != total ) + { + if ( m_request.GetStorage() == wxWebRequest::Storage_Memory ) + { + PreAllocBuffer(static_cast(total)); + } + m_knownDownloadSize = total; + } + + return 0; +} + wxFileOffset wxWebResponseCURL::GetContentLength() const { #if CURL_AT_LEAST_VERSION(7, 55, 0) @@ -176,8 +238,6 @@ wxWebRequestCURL::wxWebRequestCURL(wxWebSession & session, // Set error buffer to get more detailed CURL status m_errorBuffer[0] = '\0'; curl_easy_setopt(m_handle, CURLOPT_ERRORBUFFER, m_errorBuffer); - // Set this request in the private pointer - curl_easy_setopt(m_handle, CURLOPT_PRIVATE, static_cast(this)); // Set URL to handle: note that we must use wxURI to escape characters not // allowed in the URLs correctly (URL API is only available in libcurl // since the relatively recent v7.62.0, so we don't want to rely on it). @@ -202,8 +262,7 @@ wxWebRequestCURL::wxWebRequestCURL(wxWebSession & session, wxWebRequestCURL::~wxWebRequestCURL() { DestroyHeaderList(); - - curl_easy_cleanup(m_handle); + m_sessionImpl.RequestHasTerminated(this); } void wxWebRequestCURL::Start() @@ -361,39 +420,488 @@ void wxWebAuthChallengeCURL::SetCredentials(const wxWebCredentials& cred) m_request.StartRequest(); } +// +// SocketPoller - a helper class for wxWebSessionCURL +// + +wxDECLARE_EVENT(wxEVT_SOCKET_POLLER_RESULT, wxThreadEvent); + +class SocketPollerImpl; + +class SocketPoller +{ +public: + enum PollAction + { + INVALID_ACTION = 0x00, + POLL_FOR_READ = 0x01, + POLL_FOR_WRITE = 0x02 + }; + + enum Result + { + INVALID_RESULT = 0x00, + READY_FOR_READ = 0x01, + READY_FOR_WRITE = 0x02, + HAS_ERROR = 0x04 + }; + + SocketPoller(wxEvtHandler*); + ~SocketPoller(); + bool StartPolling(wxSOCKET_T, int); + void StopPolling(wxSOCKET_T); + void ResumePolling(wxSOCKET_T); + +private: + SocketPollerImpl* m_impl; +}; + +wxDEFINE_EVENT(wxEVT_SOCKET_POLLER_RESULT, wxThreadEvent); + +class SocketPollerImpl +{ +public: + virtual ~SocketPollerImpl(){}; + virtual bool StartPolling(wxSOCKET_T, int) = 0; + virtual void StopPolling(wxSOCKET_T) = 0; + virtual void ResumePolling(wxSOCKET_T) = 0; + + static SocketPollerImpl* Create(wxEvtHandler*); +}; + +SocketPoller::SocketPoller(wxEvtHandler* hndlr) +{ + m_impl = SocketPollerImpl::Create(hndlr); +} + +SocketPoller::~SocketPoller() +{ + delete m_impl; +} + +bool SocketPoller::StartPolling(wxSOCKET_T sock, int pollAction) +{ + return m_impl->StartPolling(sock, pollAction); +} +void SocketPoller::StopPolling(wxSOCKET_T sock) +{ + m_impl->StopPolling(sock); +} + +void SocketPoller::ResumePolling(wxSOCKET_T sock) +{ + m_impl->ResumePolling(sock); +} + +#ifdef __WINDOWS__ + +class WinSock1SocketPoller: public SocketPollerImpl +{ +public: + WinSock1SocketPoller(wxEvtHandler*); + virtual ~WinSock1SocketPoller(); + virtual bool StartPolling(wxSOCKET_T, int) wxOVERRIDE; + virtual void StopPolling(wxSOCKET_T) wxOVERRIDE; + virtual void ResumePolling(wxSOCKET_T) wxOVERRIDE; + +private: + static LRESULT CALLBACK MsgProc(HWND hwnd, WXUINT uMsg, WXWPARAM wParam, + WXLPARAM lParam); + static const WXUINT SOCKET_MESSAGE; + + WX_DECLARE_HASH_SET(wxSOCKET_T, wxIntegerHash, wxIntegerEqual, SocketSet); + + SocketSet m_polledSockets; + WXHWND m_hwnd; +}; + +const WXUINT WinSock1SocketPoller::SOCKET_MESSAGE = WM_USER + 1; + +WinSock1SocketPoller::WinSock1SocketPoller(wxEvtHandler* hndlr) +{ + // Initialize winsock in case it's not already done. + WORD wVersionRequested = MAKEWORD(1,1); + WSADATA wsaData; + WSAStartup(wVersionRequested, &wsaData); + + // Create a dummy message only window. + m_hwnd = CreateWindowEx( + 0, //DWORD dwExStyle, + TEXT("STATIC"), //LPCSTR lpClassName, + NULL, //LPCSTR lpWindowName, + 0, //DWORD dwStyle, + 0, //int X, + 0, //int Y, + 0, //int nWidth, + 0, //int nHeight, + HWND_MESSAGE, //HWND hWndParent, + NULL, //HMENU hMenu, + NULL, //HINSTANCE hInstance, + NULL //LPVOID lpParam + ); + + if ( m_hwnd == NULL ) + { + wxLogError("Unable to create message window for WinSock1SocketPoller"); + return; + } + + // Set the event handler to be the message window's user data. Also set the + // message window to use our MsgProc to process messages it receives. + SetWindowLongPtr(m_hwnd, GWLP_USERDATA, reinterpret_cast(hndlr)); + SetWindowLongPtr(m_hwnd, GWLP_WNDPROC, + reinterpret_cast(WinSock1SocketPoller::MsgProc)); +} + +WinSock1SocketPoller::~WinSock1SocketPoller() +{ + // Stop monitoring any leftover sockets. + for ( SocketSet::iterator it = m_polledSockets.begin() ; + it != m_polledSockets.end() ; ++it ) + { + WSAAsyncSelect(*it, m_hwnd, 0, 0); + } + + // Close the message window. + if ( m_hwnd ) + { + CloseWindow(m_hwnd); + } + + // Cleanup winsock. + WSACleanup(); +} + +bool WinSock1SocketPoller::StartPolling(wxSOCKET_T sock, int pollAction) +{ + StopPolling(sock); + + // Convert pollAction to a flag that can be used by winsock. + int winActions = 0; + + if ( pollAction & SocketPoller::POLL_FOR_READ ) + { + winActions |= FD_READ; + } + + if ( pollAction & SocketPoller::POLL_FOR_WRITE ) + { + winActions |= FD_WRITE; + } + + // Have winsock send a message to our window whenever activity is + // detected on the socket. + WSAAsyncSelect(sock, m_hwnd, SOCKET_MESSAGE, winActions); + + m_polledSockets.insert(sock); + return true; +} + +void WinSock1SocketPoller::StopPolling(wxSOCKET_T sock) +{ + SocketSet::iterator it = m_polledSockets.find(sock); + + if ( it != m_polledSockets.end() ) + { + // Stop sending messages when there is activity on the socket. + WSAAsyncSelect(sock, m_hwnd, 0, 0); + m_polledSockets.erase(it); + } +} + +void WinSock1SocketPoller::ResumePolling(wxSOCKET_T WXUNUSED(sock)) +{ +} + +LRESULT CALLBACK WinSock1SocketPoller::MsgProc(WXHWND hwnd, WXUINT uMsg, + WXWPARAM wParam, WXLPARAM lParam) +{ + // We only handle 1 message - the message we told winsock to send when + // it notices activity on sockets we are monitoring. + + if ( uMsg == SOCKET_MESSAGE ) + { + // Extract the result any any errors from lParam. + int winResult = LOWORD(lParam); + int error = HIWORD(lParam); + + // Convert the result/errors to a SocketPoller::Result flag. + int pollResult = 0; + + if ( winResult & FD_READ ) + { + pollResult |= SocketPoller::READY_FOR_READ; + } + + if ( winResult & FD_WRITE ) + { + pollResult |= SocketPoller::READY_FOR_WRITE; + } + + if ( error != 0 ) + { + pollResult |= SocketPoller::HAS_ERROR; + } + + // If there is a significant result, send an event. + if ( pollResult != 0 ) + { + // The event handler is stored in the window's user data and the + // socket with activity is given by wParam. + LONG_PTR userData = GetWindowLongPtr(hwnd, GWLP_USERDATA); + wxEvtHandler* hndlr = reinterpret_cast(userData); + wxSOCKET_T sock = wParam; + + wxThreadEvent* event = + new wxThreadEvent(wxEVT_SOCKET_POLLER_RESULT); + event->SetPayload(sock); + event->SetInt(pollResult); + + if ( wxThread::IsMain() ) + { + hndlr->ProcessEvent(*event); + delete event; + } + else + { + wxQueueEvent(hndlr, event); + } + } + + return 0; + } + else + { + return DefWindowProc(hwnd, uMsg, wParam, lParam); + } +} + +SocketPollerImpl* SocketPollerImpl::Create(wxEvtHandler* hndlr) +{ + return new WinSock1SocketPoller(hndlr); +} + +#else + +// SocketPollerSourceHandler - a source handler used by the SocketPoller class. + +class SocketPollerSourceHandler: public wxEventLoopSourceHandler +{ +public: + SocketPollerSourceHandler(wxSOCKET_T, wxEvtHandler*); + + void OnReadWaiting() wxOVERRIDE; + void OnWriteWaiting() wxOVERRIDE; + void OnExceptionWaiting() wxOVERRIDE; + ~SocketPollerSourceHandler(){} +private: + void SendEvent(int); + wxSOCKET_T m_socket; + wxEvtHandler* m_handler; +}; + +SocketPollerSourceHandler::SocketPollerSourceHandler(wxSOCKET_T sock, + wxEvtHandler* hndlr) +{ + m_socket = sock; + m_handler = hndlr; +} + +void SocketPollerSourceHandler::OnReadWaiting() +{ + SendEvent(SocketPoller::READY_FOR_READ); +} + +void SocketPollerSourceHandler::OnWriteWaiting() +{ + SendEvent(SocketPoller::READY_FOR_WRITE); +} + +void SocketPollerSourceHandler::OnExceptionWaiting() +{ + SendEvent(SocketPoller::HAS_ERROR); +} + +void SocketPollerSourceHandler::SendEvent(int result) +{ + wxThreadEvent event(wxEVT_SOCKET_POLLER_RESULT); + event.SetPayload(m_socket); + event.SetInt(result); + m_handler->ProcessEvent(event); +} + +// SourceSocketPoller - a SocketPollerImpl based on event loop sources. + +class SourceSocketPoller: public SocketPollerImpl +{ +public: + SourceSocketPoller(wxEvtHandler*); + ~SourceSocketPoller(); + bool StartPolling(wxSOCKET_T, int) wxOVERRIDE; + void StopPolling(wxSOCKET_T) wxOVERRIDE; + void ResumePolling(wxSOCKET_T) wxOVERRIDE; + +private: + WX_DECLARE_HASH_MAP(wxSOCKET_T, wxEventLoopSource*, wxIntegerHash,\ + wxIntegerEqual, SocketDataMap); + + void CleanUpSocketSource(wxEventLoopSource*); + + SocketDataMap m_socketData; + wxEvtHandler* m_handler; +}; + +SourceSocketPoller::SourceSocketPoller(wxEvtHandler* hndlr) +{ + m_handler = hndlr; +} + +SourceSocketPoller::~SourceSocketPoller() +{ + // Clean up any leftover socket data. + for ( SocketDataMap::iterator it = m_socketData.begin() ; + it != m_socketData.end() ; ++it ) + { + CleanUpSocketSource(it->second); + } +} + +static int SocketPoller2EventSource(int pollAction) +{ + // Convert the SocketPoller::PollAction value to a flag that can be used + // by wxEventLoopSource. + + // Always check for errors. + int eventSourceFlag = wxEVENT_SOURCE_EXCEPTION; + + if ( pollAction & SocketPoller::POLL_FOR_READ ) + { + eventSourceFlag |= wxEVENT_SOURCE_INPUT; + } + + if ( pollAction & SocketPoller::POLL_FOR_WRITE ) + { + eventSourceFlag |= wxEVENT_SOURCE_OUTPUT; + } + + return eventSourceFlag; +} + +bool SourceSocketPoller::StartPolling(wxSOCKET_T sock, int pollAction) +{ + SocketDataMap::iterator it = m_socketData.find(sock); + wxEventLoopSourceHandler* srcHandler = NULL; + + if ( it != m_socketData.end() ) + { + // If this socket is already being polled, reuse the old handler. Also + // delete the old source object to stop the old polling operations. + wxEventLoopSource* oldSrc = it->second; + srcHandler = oldSrc->GetHandler(); + + delete oldSrc; + } + else + { + // Otherwise create a new source handler. + srcHandler = + new SocketPollerSourceHandler(sock, m_handler); + } + + // Get a new source object for these polling checks. + bool socketIsPolled = true; + int eventSourceFlag = SocketPoller2EventSource(pollAction); + wxEventLoopSource* newSrc = + wxEventLoopBase::AddSourceForFD(sock, srcHandler, eventSourceFlag); + + if ( newSrc == NULL ) + { + // We were not able to add a source for this socket. + wxLogDebug(wxString::Format( + "Unable to create event loop source for %d", + static_cast(sock))); + + delete srcHandler; + socketIsPolled = false; + + if ( it != m_socketData.end() ) + { + m_socketData.erase(it); + } + } + else + { + m_socketData[sock] = newSrc; + } + + return socketIsPolled; +} + +void SourceSocketPoller::StopPolling(wxSOCKET_T sock) +{ + SocketDataMap::iterator it = m_socketData.find(sock); + + if ( it != m_socketData.end() ) + { + CleanUpSocketSource(it->second); + m_socketData.erase(it); + } +} + +void SourceSocketPoller::ResumePolling(wxSOCKET_T WXUNUSED(sock)) +{ +} + +void SourceSocketPoller::CleanUpSocketSource(wxEventLoopSource* source) +{ + wxEventLoopSourceHandler* srcHandler = source->GetHandler(); + delete source; + delete srcHandler; +} + +SocketPollerImpl* SocketPollerImpl::Create(wxEvtHandler* hndlr) +{ + return new SourceSocketPoller(hndlr); +} + +#endif + // // wxWebSessionCURL // int wxWebSessionCURL::ms_activeSessions = 0; +unsigned int wxWebSessionCURL::ms_runtimeVersion = 0; wxWebSessionCURL::wxWebSessionCURL() : - m_handle(NULL), - m_condition(m_mutex), - m_shuttingDown(false) + m_handle(NULL) { // Initialize CURL globally if no sessions are active if ( ms_activeSessions == 0 ) { if ( curl_global_init(CURL_GLOBAL_ALL) ) + { wxLogError(_("libcurl could not be initialized")); + } + else + { + curl_version_info_data* data = curl_version_info(CURLVERSION_NOW); + ms_runtimeVersion = data->version_num; + } } ms_activeSessions++; + + m_socketPoller = new SocketPoller(this); + m_timeoutTimer.SetOwner(this); + Bind(wxEVT_TIMER, &wxWebSessionCURL::TimeoutNotification, this); + Bind(wxEVT_SOCKET_POLLER_RESULT, + &wxWebSessionCURL::ProcessSocketPollerResult, this); } wxWebSessionCURL::~wxWebSessionCURL() { - { - // Notify the work thread - wxMutexLocker lock(m_mutex); - m_shuttingDown = true; - m_condition.Signal(); - } - - // Wait for work thread to finish - if ( GetThread() && GetThread()->IsRunning() ) - GetThread()->Wait(wxTHREAD_WAIT_BLOCK); + delete m_socketPoller; if ( m_handle ) curl_multi_cleanup(m_handle); @@ -419,147 +927,58 @@ wxWebSessionCURL::CreateRequest(wxWebSession& session, wxLogDebug("curl_multi_init() failed"); return wxWebRequestImplPtr(); } + else + { + curl_multi_setopt(m_handle, CURLMOPT_SOCKETDATA, this); + curl_multi_setopt(m_handle, CURLMOPT_SOCKETFUNCTION, SocketCallback); + curl_multi_setopt(m_handle, CURLMOPT_TIMERDATA, this); + curl_multi_setopt(m_handle, CURLMOPT_TIMERFUNCTION, TimerCallback); + } } return wxWebRequestImplPtr(new wxWebRequestCURL(session, *this, handler, url, id)); } -static CURLMcode wx_curl_multi_wait(CURLM *multi_handle, int timeout_ms, - int *ret) -{ - // since libcurl 7.28.0 the curl_multi_wait method is more convient than - // calling multiple curl_multi_... methods. - // When support for older libcurl versions is dropped this wrapper can be - // removed. -#if wxCURL_HAVE_MULTI_WAIT - return curl_multi_wait(multi_handle, NULL, 0, timeout_ms, ret); -#else - wxASSERT(ret != NULL); - - fd_set fdread; - fd_set fdwrite; - fd_set fdexcep; - timeval timeout; - - long curl_timeo; - - curl_multi_timeout(multi_handle, &curl_timeo); - if ( curl_timeo < 0 ) - curl_timeo = timeout_ms; - - timeout.tv_sec = curl_timeo / 1000; - timeout.tv_usec = (curl_timeo % 1000) * 1000; - - FD_ZERO(&fdread); - FD_ZERO(&fdwrite); - FD_ZERO(&fdexcep); - - curl_multi_fdset(multi_handle, &fdread, &fdwrite, &fdexcep, ret); - if ( *ret == -1 ) - return CURLM_OK; - else if ( select(*ret + 1, &fdread, &fdwrite, &fdexcep, &timeout) == -1 ) - return CURLM_BAD_SOCKET; - else - return CURLM_OK; -#endif -} - -wxThread::ExitCode wxWebSessionCURL::Entry() -{ - // This mutex will be unlocked only while we're waiting on the condition. - wxMutexLocker lock(m_mutex); - - int activeRequests = -1; - int repeats = 0; - - while ( activeRequests ) - { - // Handle cancelled requests - { - wxCriticalSectionLocker cancelledLock(m_cancelled.cs); - while ( !m_cancelled.requests.empty() ) - { - wxObjectDataPtr request(m_cancelled.requests.back()); - m_cancelled.requests.pop_back(); - curl_multi_remove_handle(m_handle, request->GetHandle()); - request->SetState(wxWebRequest::State_Cancelled); - } - } - - // Instruct CURL to work on requests - curl_multi_perform(m_handle, &activeRequests); - - // Process CURL message queue - int msgQueueCount; - while ( CURLMsg* msg = curl_multi_info_read(m_handle, &msgQueueCount) ) - { - if ( msg->msg == CURLMSG_DONE ) - { - wxWebRequestCURL* request; - curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &request); - curl_multi_remove_handle(m_handle, msg->easy_handle); - request->HandleCompletion(); - } - } - - if ( activeRequests ) - { - // Wait for CURL work to finish - int numfds; - wx_curl_multi_wait(m_handle, 500, &numfds); - - if ( !numfds ) - { - repeats++; // count number of repeated zero numfds - if ( repeats > 1 ) - wxMilliSleep(100); - } - else - repeats = 0; - } - else - { - // Wait for new requests or shutdown of the session - m_condition.Wait(); - if ( !m_shuttingDown ) - activeRequests = -1; - } - } - - return (wxThread::ExitCode)0; -} - bool wxWebSessionCURL::StartRequest(wxWebRequestCURL & request) { // Add request easy handle to multi handle - curl_multi_add_handle(m_handle, request.GetHandle()); + CURL* curl = request.GetHandle(); + int code = curl_multi_add_handle(m_handle, curl); - // Create and start session thread if not yet running - if ( !GetThread() ) + if ( code == CURLM_OK ) { - if ( CreateThread() ) - return false; + request.SetState(wxWebRequest::State_Active); + m_activeTransfers[curl] = &request; - if ( GetThread()->Run() != wxTHREAD_NO_ERROR ) - return false; + // Report a timeout to curl to initiate this transfer. + int runningHandles; + curl_multi_socket_action(m_handle, CURL_SOCKET_TIMEOUT, 0, + &runningHandles); + + return true; + } + else + { + return false; } - - request.SetState(wxWebRequest::State_Active); - - // Signal the worker thread to resume work - wxMutexLocker lock(m_mutex); - m_condition.Signal(); - - return true; } void wxWebSessionCURL::CancelRequest(wxWebRequestCURL* request) { - // Add the request to a list of threads that will be removed from the curl - // multi handle in the worker thread - wxCriticalSectionLocker lock(m_cancelled.cs); - request->IncRef(); - m_cancelled.requests.push_back(wxObjectDataPtr(request)); + // If this transfer is currently active, stop it. + CURL* curl = request->GetHandle(); + StopActiveTransfer(curl); + + request->SetState(wxWebRequest::State_Cancelled); +} + +void wxWebSessionCURL::RequestHasTerminated(wxWebRequestCURL* request) +{ + // If this transfer is currently active, stop it. + CURL* curl = request->GetHandle(); + StopActiveTransfer(curl); + + curl_easy_cleanup(curl); } wxVersionInfo wxWebSessionCURL::GetLibraryVersionInfo() @@ -575,4 +994,253 @@ wxVersionInfo wxWebSessionCURL::GetLibraryVersionInfo() desc); } +bool wxWebSessionCURL::CurlRuntimeAtLeastVersion(unsigned int major, + unsigned int minor, + unsigned int patch) +{ + return (ms_runtimeVersion >= CURL_VERSION_BITS(major, minor, patch)); +} + +// curl interacts with the wxWebSessionCURL class through 2 callback functions +// 1) TimerCallback is called whenever curl wants us to start or stop a timer. +// 2) SocketCallback is called when curl wants us to start monitoring a socket +// for activity. +// +// curl accomplishes the network transfers by calling the +// curl_multi_socket_action function to move pieces of the transfer to or from +// the system's network services. Consequently we call this function when a +// timeout occurs or when activity is detected on a socket. + +int wxWebSessionCURL::TimerCallback(CURLM* WXUNUSED(multi), long timeoutms, + void *userp) +{ + wxWebSessionCURL* session = reinterpret_cast(userp); + session->ProcessTimerCallback(timeoutms); + return 0; +} + +int wxWebSessionCURL::SocketCallback(CURL* curl, curl_socket_t sock, int what, + void* userp, void* WXUNUSED(sp)) +{ + wxWebSessionCURL* session = reinterpret_cast(userp); + session->ProcessSocketCallback(curl, sock, what); + return CURLM_OK; +}; + +void wxWebSessionCURL::ProcessTimerCallback(long timeoutms) +{ + // When this callback is called, curl wants us to start or stop a timer. + // If timeoutms = -1, we should stop the timer. If timeoutms > 0, we should + // start a oneshot timer and when that timer expires, we should call + // curl_multi_socket_action(m_handle, CURL_SOCKET_TIMEOUT,... + // + // In the special case that timeoutms = 0, we should signal a timeout as + // soon as possible (as above by calling curl_multi_socket_action). But + // according to the curl documentation, we can't do that from this callback + // or we might cause an infinite loop. So use CallAfter to report the + // timeout at a slightly later time. + + if ( timeoutms > 0) + { + m_timeoutTimer.StartOnce(timeoutms); + } + else if ( timeoutms < 0 ) + { + m_timeoutTimer.Stop(); + } + else // timeoutms == 0 + { + CallAfter(&wxWebSessionCURL::ProcessTimeoutNotification); + } +} + +void wxWebSessionCURL::TimeoutNotification(wxTimerEvent& WXUNUSED(event)) +{ + ProcessTimeoutNotification(); +} + +void wxWebSessionCURL::ProcessTimeoutNotification() +{ + int runningHandles; + curl_multi_socket_action(m_handle, CURL_SOCKET_TIMEOUT, 0, &runningHandles); + + CheckForCompletedTransfers(); +} + +static int CurlPoll2SocketPoller(int what) +{ + int pollAction = SocketPoller::INVALID_ACTION; + + if ( what == CURL_POLL_IN ) + { + pollAction = SocketPoller::POLL_FOR_READ ; + } + else if ( what == CURL_POLL_OUT ) + { + pollAction = SocketPoller::POLL_FOR_WRITE; + } + else if ( what == CURL_POLL_INOUT ) + { + pollAction = + SocketPoller::POLL_FOR_READ | SocketPoller::POLL_FOR_WRITE; + } + + return pollAction; +} + +void wxWebSessionCURL::ProcessSocketCallback(CURL* curl, curl_socket_t s, + int what) +{ + // Have the socket poller start or stop monitoring a socket depending of + // the value of what. + + switch ( what ) + { + case CURL_POLL_IN: + wxFALLTHROUGH; + case CURL_POLL_OUT: + wxFALLTHROUGH; + case CURL_POLL_INOUT: + { + m_activeSockets[curl] = s; + + int pollAction = CurlPoll2SocketPoller(what); + bool socketIsMonitored = m_socketPoller->StartPolling(s, + pollAction); + + if ( !socketIsMonitored ) + { + TransferSet::iterator it = m_activeTransfers.find(curl); + + if ( it != m_activeTransfers.end() ) + { + FailRequest(curl, + "wxWebSession failed to monitor a socket for this " + "transfer"); + } + } + } + break; + case CURL_POLL_REMOVE: + m_socketPoller->StopPolling(s); + RemoveActiveSocket(curl); + break; + default: + wxLogDebug("Unknown socket action in ProcessSocketCallback"); + break; + } +} + +static int SocketPollerResult2CurlSelect(int socketEventFlag) +{ + int curlSelect = 0; + + if ( socketEventFlag & SocketPoller::READY_FOR_READ ) + { + curlSelect |= CURL_CSELECT_IN; + } + + if ( socketEventFlag & SocketPoller::READY_FOR_WRITE ) + { + curlSelect |= CURL_CSELECT_OUT; + } + + if ( socketEventFlag & SocketPoller::HAS_ERROR ) + { + curlSelect |= CURL_CSELECT_ERR; + } + + return curlSelect; +} + +void wxWebSessionCURL::ProcessSocketPollerResult(wxThreadEvent& event) +{ + // Convert the socket poller result to an action flag needed by curl. + // Then call curl_multi_socket_action. + curl_socket_t sock = event.GetPayload(); + int action = SocketPollerResult2CurlSelect(event.GetInt()); + int runningHandles; + curl_multi_socket_action(m_handle, sock, action, &runningHandles); + + CheckForCompletedTransfers(); + m_socketPoller->ResumePolling(sock); +} + +void wxWebSessionCURL::CheckForCompletedTransfers() +{ + // Process CURL message queue + int msgQueueCount; + while ( CURLMsg* msg = curl_multi_info_read(m_handle, &msgQueueCount) ) + { + if ( msg->msg == CURLMSG_DONE ) + { + CURL* curl = msg->easy_handle; + TransferSet::iterator it = m_activeTransfers.find(curl); + + if ( it != m_activeTransfers.end() ) + { + wxWebRequestCURL* request = it->second; + curl_multi_remove_handle(m_handle, curl); + request->HandleCompletion(); + m_activeTransfers.erase(it); + RemoveActiveSocket(curl); + } + } + } +} + +void wxWebSessionCURL::FailRequest(CURL* curl,const wxString& msg) +{ + TransferSet::iterator it = m_activeTransfers.find(curl); + + if ( it != m_activeTransfers.end() ) + { + wxWebRequestCURL* request = it->second; + StopActiveTransfer(curl); + + request->SetState(wxWebRequest::State_Failed, msg); + } +} + +void wxWebSessionCURL::StopActiveTransfer(CURL* curl) +{ + TransferSet::iterator it = m_activeTransfers.find(curl); + + if ( it != m_activeTransfers.end() ) + { + // Record the current active socket now since it should be removed from + // the m_activeSockets map when we call curl_multi_remove_handle. + curl_socket_t activeSocket = CURL_SOCKET_BAD; + CurlSocketMap::iterator it2 = m_activeSockets.find(curl); + + if ( it2 != m_activeSockets.end() ) + { + activeSocket = it2->second; + } + + // Remove the CURL easy handle from the CURLM multi handle. + curl_multi_remove_handle(m_handle, curl); + + // If the transfer was active, close its socket. + if ( activeSocket != CURL_SOCKET_BAD ) + { + wxCloseSocket(activeSocket); + } + + // Clean up the maps. + RemoveActiveSocket(curl); + m_activeTransfers.erase(it); + } +} + +void wxWebSessionCURL::RemoveActiveSocket(CURL* curl) +{ + CurlSocketMap::iterator it = m_activeSockets.find(curl); + + if ( it != m_activeSockets.end() ) + { + m_activeSockets.erase(it); + } +} + #endif // wxUSE_WEBREQUEST_CURL