From b1b53ed85ce502180f0efb2f132e2762c9609cfc Mon Sep 17 00:00:00 2001 From: New Pagodi Date: Mon, 25 Jan 2021 16:26:27 -0600 Subject: [PATCH 01/12] Remove the worker thread from wxWebSessionCURL Instead of having wxWebSessionCURL run a worker thread that uses curl to monitor and process network activity, set up a separate socket poller class to monitor socket activity. The socket poller class will throw an event back to wxWebSessionCURL when it detects activity on the sockets so that it can tell curl to process the activity in the main thread. --- include/wx/private/webrequest_curl.h | 32 +- src/common/webrequest_curl.cpp | 828 ++++++++++++++++++++++----- 2 files changed, 706 insertions(+), 154 deletions(-) diff --git a/include/wx/private/webrequest_curl.h b/include/wx/private/webrequest_curl.h index ec38041785..b0aa565584 100644 --- a/include/wx/private/webrequest_curl.h +++ b/include/wx/private/webrequest_curl.h @@ -16,12 +16,14 @@ #include "wx/thread.h" #include "wx/vector.h" +#include "wx/timer.h" #include "curl/curl.h" class wxWebRequestCURL; class wxWebResponseCURL; class wxWebSessionCURL; +class SocketPoller; class wxWebAuthChallengeCURL : public wxWebAuthChallengeImpl { @@ -123,7 +125,7 @@ private: wxDECLARE_NO_COPY_CLASS(wxWebResponseCURL); }; -class wxWebSessionCURL : public wxWebSessionImpl, private wxThreadHelper +class wxWebSessionCURL : public wxWebSessionImpl, public wxEvtHandler { public: wxWebSessionCURL(); @@ -147,25 +149,21 @@ public: void CancelRequest(wxWebRequestCURL* request); -protected: - wxThread::ExitCode Entry() wxOVERRIDE; - private: + static int TimerCallback(CURLM*, long, void*); + static int SocketCallback(CURL*, curl_socket_t, int, void*, void*); + + void ProcessTimerCallback(long); + void TimeoutNotification(wxTimerEvent&); + void ProcessTimeoutNotification(); + void ProcessSocketCallback(curl_socket_t, int); + void ProcessSocketPollerResult(wxThreadEvent&); + void CheckForCompletedTransfers(); + + SocketPoller* m_socketPoller; + wxTimer m_timeoutTimer; CURLM* m_handle; - // Mutex and condition are used together to signal to the worker thread to - // wake up and mutex is also used to protected m_shuttingDown field. - wxMutex m_mutex; - wxCondition m_condition; - bool m_shuttingDown; - - // MT-safe vector of requests for which Cancel() was called. - struct CancelledData - { - wxCriticalSection cs; - wxVector< wxObjectDataPtr > requests; - } m_cancelled; - static int ms_activeSessions; wxDECLARE_NO_COPY_CLASS(wxWebSessionCURL); diff --git a/src/common/webrequest_curl.cpp b/src/common/webrequest_curl.cpp index e6e47e567c..d82c41e43f 100644 --- a/src/common/webrequest_curl.cpp +++ b/src/common/webrequest_curl.cpp @@ -23,6 +23,19 @@ #endif #include "wx/uri.h" +#include "wx/socket.h" +#include "wx/msgqueue.h" +#include "wx/hashset.h" +#include "wx/hashmap.h" + +#ifdef __WXMSW__ + #include +#else + #include + #include +#endif + + // Define symbols that might be missing from older libcurl headers #ifndef CURL_AT_LEAST_VERSION @@ -361,6 +374,503 @@ void wxWebAuthChallengeCURL::SetCredentials(const wxWebCredentials& cred) m_request.StartRequest(); } + +wxDECLARE_EVENT(wxSocketAction,wxThreadEvent); + +class SocketPoller: public wxThreadHelper +{ +public: + enum PollAction + { + InvalidAction = 0, + PollForRead = 1, + PollForWrite = 2, + PollForError = 4 + }; + + enum Result + { + InvalidResult= 0, + ReadyForRead = 1, + ReadyForWrite = 2, + HasError = 4 + }; + + SocketPoller(); + ~SocketPoller(); + bool StartPolling(wxSOCKET_T, int, wxEvtHandler*); + void StopPolling(wxSOCKET_T); + void ResumePolling(wxSOCKET_T); + +private: + enum + { +#ifdef INVALID_SOCKET + SOCKET_POLLER_INVALID_SOCKET = INVALID_SOCKET +#else + SOCKET_POLLER_INVALID_SOCKET = ((wxSOCKET_T)(~0)) +#endif + }; + + class Message + { + public: + enum MessageId + { + AddSocketAction, + DeleteSocketAction, + ResumePolling, + Quit, + LastMessageId + }; + + Message(MessageId i = LastMessageId, + wxSOCKET_T s = SOCKET_POLLER_INVALID_SOCKET, + int f = 0, + wxEvtHandler* h = NULL) + { + m_messageId = i; + m_socket = s; + m_flags = f; + m_evtHandler = h; + } + + MessageId GetMessageId() const {return m_messageId;} + wxSOCKET_T GetSocket() const {return m_socket;} + int GetFlags() const {return m_flags;} + wxEvtHandler* GetEvtHandler() const {return m_evtHandler;} + + private: + wxSOCKET_T m_socket; + int m_flags; + MessageId m_messageId; + wxEvtHandler* m_evtHandler; + }; + + struct SocketData + { + int m_pollAction; + void* m_event; + wxEvtHandler* m_handler; + }; + + WX_DECLARE_HASH_SET(wxSOCKET_T, wxIntegerHash, wxIntegerEqual, SocketSet ); + WX_DECLARE_HASH_MAP(wxSOCKET_T, SocketData, wxIntegerHash, wxIntegerEqual, \ + SocketDataMap ); + + // Items used from main thread. + void CreateSocketPair(); + void CloseSocket(wxSOCKET_T); + void PostAndSignal(const Message&); + + SocketSet m_polledSockets; + wxSOCKET_T m_writeEnd; + + + // Items used in both threads. + wxMessageQueue m_msgQueue; + wxSOCKET_T m_readEnd; + + + // Items used from worker thread. + virtual wxThread::ExitCode Entry() wxOVERRIDE; + + void ThreadRemoveFromDL(wxSOCKET_T); + void ThreadSetSocketAction(wxSOCKET_T, int,wxEvtHandler*); + void ThreadDeleteSocketAction(wxSOCKET_T); + void ThreadCheckSockets(); + + SocketDataMap m_socketData; + SocketSet m_disabledList; +}; + +wxDEFINE_EVENT(wxSocketAction,wxThreadEvent); + + +SocketPoller::SocketPoller() +{ + m_writeEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; + m_readEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; + CreateSocketPair(); + + if (CreateThread(wxTHREAD_JOINABLE) != wxTHREAD_NO_ERROR) + { + wxLogDebug("Could not create socket poller worker thread!"); + return; + } + + if (GetThread()->Run() != wxTHREAD_NO_ERROR) + { + wxLogDebug("Could not run socket poller worker thread!"); + return; + } +} + +SocketPoller::~SocketPoller() +{ + SocketPoller::Message msg(Message::Quit); + PostAndSignal(msg); + + GetThread()->Wait(); + + CloseSocket(m_writeEnd); + CloseSocket(m_readEnd); + +#ifdef __WXMSW__ + WSACleanup(); +#endif +} + +void SocketPoller::CreateSocketPair() +{ +#ifdef __WXMSW__ + WORD wVersionRequested = MAKEWORD(1,1); + WSADATA wsaData; + WSAStartup(wVersionRequested, &wsaData); + + m_writeEnd = socket(AF_INET, SOCK_DGRAM, 0); + + if ( m_writeEnd == INVALID_SOCKET ) + { + wxLogDebug("Unable to create write end of socket pair."); + m_writeEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; + return; + } + + m_readEnd = socket(AF_INET, SOCK_DGRAM, 0); + + if ( m_readEnd == INVALID_SOCKET ) + { + wxLogDebug("Unable to create read end of socket pair."); + CloseSocket(m_writeEnd); + m_writeEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; + m_readEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; + return; + } + + // Bind the read end. This will assign it to an unused port. + struct sockaddr_in serverAddr; + serverAddr.sin_family = AF_INET; + serverAddr.sin_port = htons(0); + serverAddr.sin_addr.s_addr = inet_addr("127.0.0.1"); + + int retcode = bind(m_readEnd, (struct sockaddr *)&serverAddr, + sizeof(serverAddr)); + if ( retcode < 0 ) + { + wxLogDebug("Unable bind socket to port."); + CloseSocket(m_writeEnd); + m_writeEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; + CloseSocket(m_readEnd); + m_readEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; + return; + } + + // Get the ip address and port of the read end. + struct sockaddr_in readEndAddr; + ZeroMemory(&readEndAddr, sizeof(readEndAddr)); + int len = sizeof(readEndAddr); + getsockname(m_readEnd, (struct sockaddr *) &readEndAddr, &len); + + // Unlike with stream sockets, this is a synchronous operation and just + // sets location for the send function. + connect(m_writeEnd, (struct sockaddr *)&readEndAddr, sizeof(readEndAddr)); +#else + int fd[2]; + socketpair(AF_UNIX, SOCK_STREAM, 0, fd); + + m_writeEnd = fd[0]; + m_readEnd = fd[1]; +#endif +} + +void SocketPoller::CloseSocket(wxSOCKET_T s) +{ +#ifdef __WXMSW__ + closesocket(s); +#else + close(s); +#endif +} + +bool SocketPoller::StartPolling(wxSOCKET_T s, int flags, wxEvtHandler* hndlr) +{ + SocketSet::iterator it = m_polledSockets.find(s); + + if ( m_polledSockets.size() > 1023 ) + { + if ( it == m_polledSockets.end() ) + { + return false; + } + } + + if ( it == m_polledSockets.end() ) + { + m_polledSockets.insert(s); + } + + SocketPoller::Message msg(Message::AddSocketAction, s, flags, hndlr); + PostAndSignal(msg); + + return true; +} + +void SocketPoller::StopPolling(wxSOCKET_T s) +{ + SocketPoller::Message msg(Message::DeleteSocketAction, s); + PostAndSignal(msg); + + SocketSet::iterator it = m_polledSockets.find(s); + + if ( it != m_polledSockets.end() ) + { + m_polledSockets.erase(it); + } +} + +void SocketPoller::ResumePolling(wxSOCKET_T s) +{ + SocketPoller::Message msg(Message::ResumePolling, s); + PostAndSignal(msg); +} + +void SocketPoller::PostAndSignal(const Message& msg) +{ + m_msgQueue.Post(msg); + + if ( m_writeEnd != SocketPoller::SOCKET_POLLER_INVALID_SOCKET ) + { + char c = 32; + send(m_writeEnd, &c, 1, 0); + } +} + +wxThread::ExitCode SocketPoller::Entry() +{ + wxMessageQueueError er; + SocketPoller::Message msg; + bool done = false; + size_t socketsUnderConsideration = 0; + int timeOut = 50; + Message::MessageId id; + + while ( !done ) + { + if ( GetThread()->TestDestroy() ) + { + break; + } + + // Process all messages in the message queue. If we currently have no + // sockets, wait a little for a message to come in. + timeOut = (socketsUnderConsideration == 0) ? 50 : 0; + er = wxMSGQUEUE_NO_ERROR; + + while ( er == wxMSGQUEUE_NO_ERROR ) + { + er = m_msgQueue.ReceiveTimeout(timeOut, msg); + id = msg.GetMessageId(); + + if ( er == wxMSGQUEUE_TIMEOUT ) + { + break; + } + else if ( er == wxMSGQUEUE_MISC_ERROR ) + { + wxLogDebug("Error with socket poller message queue."); + done = true; + break; + } + else if ( id == Message::Quit ) + { + done = true; + break; + } + else if ( id == Message::AddSocketAction ) + { + wxSOCKET_T s = msg.GetSocket(); + int f = msg.GetFlags(); + wxEvtHandler* hndlr = msg.GetEvtHandler(); + ThreadSetSocketAction(s, f, hndlr); + } + else if ( id == Message::DeleteSocketAction ) + { + wxSOCKET_T s = msg.GetSocket(); + ThreadDeleteSocketAction(s); + } + else if ( id == Message::ResumePolling ) + { + wxSOCKET_T s = msg.GetSocket(); + ThreadRemoveFromDL(s); + } + + timeOut = 0; + } + + socketsUnderConsideration = m_socketData.size() - m_disabledList.size(); + + if ( socketsUnderConsideration > 0 && !done ) + { + ThreadCheckSockets(); + } + } + + return static_cast(0); +} + +void SocketPoller::ThreadRemoveFromDL(wxSOCKET_T s) +{ + SocketSet::iterator it = m_disabledList.find(s); + + if ( it != m_disabledList.end() ) + { + m_disabledList.erase(it); + } +} + +void SocketPoller::ThreadDeleteSocketAction(wxSOCKET_T sock) +{ + ThreadRemoveFromDL(sock); + + SocketDataMap::iterator it = m_socketData.find(sock); + + if ( it != m_socketData.end() ) + { + m_socketData.erase(it); + } +} + +void SocketPoller::ThreadSetSocketAction(wxSOCKET_T sock, int flags, + wxEvtHandler* hndlr) +{ + ThreadDeleteSocketAction(sock); + + SocketData data; + data.m_pollAction = flags; + data.m_event = NULL; + data.m_handler = hndlr; + + m_socketData[sock] = data; +} + +void SocketPoller::ThreadCheckSockets() +{ + fd_set readFds, writeFds, errorFds; + FD_ZERO(&readFds); + FD_ZERO(&writeFds); + FD_ZERO(&errorFds); + + wxSOCKET_T maxSd = 0; + + if ( m_readEnd != SocketPoller::SOCKET_POLLER_INVALID_SOCKET ) + { + FD_SET(m_readEnd, &readFds); + maxSd = m_readEnd; + } + + for ( SocketDataMap::iterator it = m_socketData.begin() ; + it != m_socketData.end() ; ++it ) + { + wxSOCKET_T sock = it->first; + + if ( m_disabledList.find(sock) != m_disabledList.end() ) + { + continue; + } + + int checkAction = it->second.m_pollAction; + + if ( checkAction & PollForRead ) + { + FD_SET(sock, &readFds); + } + + if ( checkAction & PollForWrite ) + { + FD_SET(sock, &writeFds); + } + + FD_SET(sock, &errorFds); + + if ( sock > maxSd ) + { + maxSd = sock; + } + } + + struct timeval timeout; + timeout.tv_sec = 0; // 1s timeout + timeout.tv_usec = 50*1000; + + int selectStatus = select(maxSd+1, &readFds, &writeFds, &errorFds,&timeout); + + if ( selectStatus < 0 ) + { + // Massive error: do something + } + else if ( selectStatus == 0 ) + { + ;// select timed out. There is no need to do anything. + } + else + { + if ( (m_readEnd != SocketPoller::SOCKET_POLLER_INVALID_SOCKET) + && FD_ISSET(m_readEnd, &readFds) ) + { + char c; + recv(m_readEnd, &c, 1, 0); + } + + for ( SocketDataMap::iterator it = m_socketData.begin() ; + it != m_socketData.end() ; ++it ) + { + wxSOCKET_T sock = it->first; + + if ( m_disabledList.find(sock) != m_disabledList.end() ) + { + continue; + } + + int checkActions = it->second.m_pollAction; + wxEvtHandler* hndlr = it->second.m_handler; + int result = InvalidResult; + + if ( checkActions & PollForRead ) + { + if ( FD_ISSET(sock, &readFds) ) + { + result |= ReadyForRead; + } + } + + if ( checkActions & ReadyForWrite ) + { + if ( FD_ISSET(sock, &writeFds) ) + { + result |= ReadyForWrite; + } + } + + if ( FD_ISSET(sock, &errorFds) ) + { + result |= HasError ; + } + + if ( result != InvalidResult && hndlr != NULL ) + { + wxThreadEvent* event = new wxThreadEvent(wxSocketAction); + event->SetPayload(sock); + event->SetInt(result); + wxQueueEvent(hndlr,event); + m_disabledList.insert(sock); + } + } + } +} + + + // // wxWebSessionCURL // @@ -368,9 +878,7 @@ void wxWebAuthChallengeCURL::SetCredentials(const wxWebCredentials& cred) int wxWebSessionCURL::ms_activeSessions = 0; wxWebSessionCURL::wxWebSessionCURL() : - m_handle(NULL), - m_condition(m_mutex), - m_shuttingDown(false) + m_handle(NULL) { // Initialize CURL globally if no sessions are active if ( ms_activeSessions == 0 ) @@ -380,20 +888,16 @@ wxWebSessionCURL::wxWebSessionCURL() : } ms_activeSessions++; + + m_socketPoller = new SocketPoller(); + m_timeoutTimer.SetOwner(this); + Bind(wxEVT_TIMER ,&wxWebSessionCURL::TimeoutNotification, this); + Bind(wxSocketAction, &wxWebSessionCURL::ProcessSocketPollerResult, this); } wxWebSessionCURL::~wxWebSessionCURL() { - { - // Notify the work thread - wxMutexLocker lock(m_mutex); - m_shuttingDown = true; - m_condition.Signal(); - } - - // Wait for work thread to finish - if ( GetThread() && GetThread()->IsRunning() ) - GetThread()->Wait(wxTHREAD_WAIT_BLOCK); + delete m_socketPoller; if ( m_handle ) curl_multi_cleanup(m_handle); @@ -419,147 +923,37 @@ wxWebSessionCURL::CreateRequest(wxWebSession& session, wxLogDebug("curl_multi_init() failed"); return wxWebRequestImplPtr(); } + else + { + curl_multi_setopt(m_handle, CURLMOPT_SOCKETDATA, this); + curl_multi_setopt(m_handle, CURLMOPT_SOCKETFUNCTION,SocketCallback); + curl_multi_setopt(m_handle, CURLMOPT_TIMERDATA, this); + curl_multi_setopt(m_handle, CURLMOPT_TIMERFUNCTION, TimerCallback); + } } return wxWebRequestImplPtr(new wxWebRequestCURL(session, *this, handler, url, id)); } -static CURLMcode wx_curl_multi_wait(CURLM *multi_handle, int timeout_ms, - int *ret) -{ - // since libcurl 7.28.0 the curl_multi_wait method is more convient than - // calling multiple curl_multi_... methods. - // When support for older libcurl versions is dropped this wrapper can be - // removed. -#if wxCURL_HAVE_MULTI_WAIT - return curl_multi_wait(multi_handle, NULL, 0, timeout_ms, ret); -#else - wxASSERT(ret != NULL); - - fd_set fdread; - fd_set fdwrite; - fd_set fdexcep; - timeval timeout; - - long curl_timeo; - - curl_multi_timeout(multi_handle, &curl_timeo); - if ( curl_timeo < 0 ) - curl_timeo = timeout_ms; - - timeout.tv_sec = curl_timeo / 1000; - timeout.tv_usec = (curl_timeo % 1000) * 1000; - - FD_ZERO(&fdread); - FD_ZERO(&fdwrite); - FD_ZERO(&fdexcep); - - curl_multi_fdset(multi_handle, &fdread, &fdwrite, &fdexcep, ret); - if ( *ret == -1 ) - return CURLM_OK; - else if ( select(*ret + 1, &fdread, &fdwrite, &fdexcep, &timeout) == -1 ) - return CURLM_BAD_SOCKET; - else - return CURLM_OK; -#endif -} - -wxThread::ExitCode wxWebSessionCURL::Entry() -{ - // This mutex will be unlocked only while we're waiting on the condition. - wxMutexLocker lock(m_mutex); - - int activeRequests = -1; - int repeats = 0; - - while ( activeRequests ) - { - // Handle cancelled requests - { - wxCriticalSectionLocker cancelledLock(m_cancelled.cs); - while ( !m_cancelled.requests.empty() ) - { - wxObjectDataPtr request(m_cancelled.requests.back()); - m_cancelled.requests.pop_back(); - curl_multi_remove_handle(m_handle, request->GetHandle()); - request->SetState(wxWebRequest::State_Cancelled); - } - } - - // Instruct CURL to work on requests - curl_multi_perform(m_handle, &activeRequests); - - // Process CURL message queue - int msgQueueCount; - while ( CURLMsg* msg = curl_multi_info_read(m_handle, &msgQueueCount) ) - { - if ( msg->msg == CURLMSG_DONE ) - { - wxWebRequestCURL* request; - curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &request); - curl_multi_remove_handle(m_handle, msg->easy_handle); - request->HandleCompletion(); - } - } - - if ( activeRequests ) - { - // Wait for CURL work to finish - int numfds; - wx_curl_multi_wait(m_handle, 500, &numfds); - - if ( !numfds ) - { - repeats++; // count number of repeated zero numfds - if ( repeats > 1 ) - wxMilliSleep(100); - } - else - repeats = 0; - } - else - { - // Wait for new requests or shutdown of the session - m_condition.Wait(); - if ( !m_shuttingDown ) - activeRequests = -1; - } - } - - return (wxThread::ExitCode)0; -} - bool wxWebSessionCURL::StartRequest(wxWebRequestCURL & request) { // Add request easy handle to multi handle - curl_multi_add_handle(m_handle, request.GetHandle()); - - // Create and start session thread if not yet running - if ( !GetThread() ) - { - if ( CreateThread() ) - return false; - - if ( GetThread()->Run() != wxTHREAD_NO_ERROR ) - return false; - } + CURL* curl = request.GetHandle(); + curl_multi_add_handle(m_handle, curl); request.SetState(wxWebRequest::State_Active); - // Signal the worker thread to resume work - wxMutexLocker lock(m_mutex); - m_condition.Signal(); + // Report a timeout to curl to initiate this transfer. + int runningHandles; + curl_multi_socket_action(m_handle, CURL_SOCKET_TIMEOUT, 0, &runningHandles); return true; } void wxWebSessionCURL::CancelRequest(wxWebRequestCURL* request) { - // Add the request to a list of threads that will be removed from the curl - // multi handle in the worker thread - wxCriticalSectionLocker lock(m_cancelled.cs); - request->IncRef(); - m_cancelled.requests.push_back(wxObjectDataPtr(request)); + curl_multi_remove_handle(m_handle, request->GetHandle()); + request->SetState(wxWebRequest::State_Cancelled); } wxVersionInfo wxWebSessionCURL::GetLibraryVersionInfo() @@ -575,4 +969,164 @@ wxVersionInfo wxWebSessionCURL::GetLibraryVersionInfo() desc); } +// curl interacts with the wxWebSessionCURL class through 2 callback functions +// 1) TimerCallback is called whenever curl wants us to start or stop a timer. +// 2) SocketCallback is called when curl wants us to start monitoring a socket +// for activity. +// +// curl accomplishes the network transfers by calling the +// curl_multi_socket_action function to move pieces of the transfer to or from +// the system's network services. Consequently we call this function when a +// timeout occurs or when activity is detected on a socket. + +int wxWebSessionCURL::TimerCallback(CURLM* WXUNUSED(multi), long timeoutms, + void *userp) +{ + wxWebSessionCURL* session = reinterpret_cast(userp); + session->ProcessTimerCallback(timeoutms); + return 0; +} + +int wxWebSessionCURL::SocketCallback(CURL* WXUNUSED(easy), curl_socket_t sock, + int what, void* userp, void* WXUNUSED(sp)) +{ + wxWebSessionCURL* session = reinterpret_cast(userp); + session->ProcessSocketCallback(sock, what); + return CURLM_OK; +}; + +void wxWebSessionCURL::ProcessTimerCallback(long timeoutms) +{ + // When this callback is called, curl wants us to start or stop a timer. + // If timeoutms = -1, we should stop the timer. If timeoutms > 0, we should + // start a oneshot timer and when that timer expires, we should call + // curl_multi_socket_action(m_handle, CURL_SOCKET_TIMEOUT,... + // + // In the special case that timeoutms = 0, we should signal a timeout as + // soon as possible (as above by calling curl_multi_socket_action). But + // according to the curl documentation, we can't do that from this callback + // or we might cause an infinite loop. So use CallAfter to report the + // timeout at a slightly later time. + + if ( timeoutms > 0) + { + m_timeoutTimer.StartOnce(timeoutms); + } + else if ( timeoutms < 0 ) + { + m_timeoutTimer.Stop(); + } + else // timeoutms == 0 + { + CallAfter(&wxWebSessionCURL::ProcessTimeoutNotification); + } +} + +void wxWebSessionCURL::TimeoutNotification(wxTimerEvent& WXUNUSED(event)) +{ + ProcessTimeoutNotification(); +} + +void wxWebSessionCURL::ProcessTimeoutNotification() +{ + int runningHandles; + curl_multi_socket_action(m_handle, CURL_SOCKET_TIMEOUT, 0,&runningHandles); + + CheckForCompletedTransfers(); +} + +static int CurlPoll2SocketPoller(int what) +{ + int pollAction = SocketPoller::InvalidAction; + + if ( what == CURL_POLL_IN ) + { + pollAction = SocketPoller::PollForRead ; + } + else if ( what == CURL_POLL_OUT ) + { + pollAction = SocketPoller::PollForWrite; + } + else if ( what == CURL_POLL_INOUT ) + { + pollAction = SocketPoller::PollForRead | SocketPoller::PollForWrite; + } + + return pollAction; +} + +void wxWebSessionCURL::ProcessSocketCallback(curl_socket_t s, int what) +{ + // Have the socket poller start or stop monitoring a socket depending of + // the value of what. + + switch ( what ) + { + case CURL_POLL_IN: + wxFALLTHROUGH; + case CURL_POLL_OUT: + wxFALLTHROUGH; + case CURL_POLL_INOUT: + m_socketPoller->StartPolling(s, CurlPoll2SocketPoller(what), this); + break; + case CURL_POLL_REMOVE: + m_socketPoller->StopPolling(s); + break; + default: + wxLogDebug("Unknown socket action in ProcessSocketCallback"); + break; + } +} + +static int SocketPollerResult2CurlSelect(int socketEventFlag) +{ + int curlSelect = 0; + + if ( socketEventFlag & SocketPoller::ReadyForRead ) + { + curlSelect |= CURL_CSELECT_IN; + } + + if ( socketEventFlag & SocketPoller::ReadyForWrite ) + { + curlSelect |= CURL_CSELECT_OUT; + } + + if ( socketEventFlag & SocketPoller::HasError ) + { + curlSelect |= CURL_CSELECT_ERR; + } + + return curlSelect; +} + +void wxWebSessionCURL::ProcessSocketPollerResult(wxThreadEvent& event) +{ + // Convert the socket poller result to an action flag needed by curl. + // Then call curl_multi_socket_action. + curl_socket_t sock = event.GetPayload(); + int action = SocketPollerResult2CurlSelect(event.GetInt()); + int runningHandles; + curl_multi_socket_action(m_handle, sock, action, &runningHandles); + + CheckForCompletedTransfers(); + m_socketPoller->ResumePolling(sock); +} + +void wxWebSessionCURL::CheckForCompletedTransfers() +{ + // Process CURL message queue + int msgQueueCount; + while ( CURLMsg* msg = curl_multi_info_read(m_handle, &msgQueueCount) ) + { + if ( msg->msg == CURLMSG_DONE ) + { + wxWebRequestCURL* request; + curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &request); + curl_multi_remove_handle(m_handle, msg->easy_handle); + request->HandleCompletion(); + } + } +} + #endif // wxUSE_WEBREQUEST_CURL From 38510d8e895ffe10b3573709ec6dfc1a3734eeec Mon Sep 17 00:00:00 2001 From: VZ Date: Tue, 26 Jan 2021 17:29:18 +0100 Subject: [PATCH 02/12] Normalize whitespace No real changes. --- src/common/webrequest_curl.cpp | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/common/webrequest_curl.cpp b/src/common/webrequest_curl.cpp index d82c41e43f..ed484096c8 100644 --- a/src/common/webrequest_curl.cpp +++ b/src/common/webrequest_curl.cpp @@ -390,7 +390,7 @@ public: enum Result { - InvalidResult= 0, + InvalidResult = 0, ReadyForRead = 1, ReadyForWrite = 2, HasError = 4 @@ -435,10 +435,10 @@ private: m_evtHandler = h; } - MessageId GetMessageId() const {return m_messageId;} - wxSOCKET_T GetSocket() const {return m_socket;} - int GetFlags() const {return m_flags;} - wxEvtHandler* GetEvtHandler() const {return m_evtHandler;} + MessageId GetMessageId() const { return m_messageId; } + wxSOCKET_T GetSocket() const { return m_socket; } + int GetFlags() const { return m_flags; } + wxEvtHandler* GetEvtHandler() const { return m_evtHandler; } private: wxSOCKET_T m_socket; @@ -854,7 +854,7 @@ void SocketPoller::ThreadCheckSockets() if ( FD_ISSET(sock, &errorFds) ) { - result |= HasError ; + result |= HasError; } if ( result != InvalidResult && hndlr != NULL ) @@ -891,7 +891,7 @@ wxWebSessionCURL::wxWebSessionCURL() : m_socketPoller = new SocketPoller(); m_timeoutTimer.SetOwner(this); - Bind(wxEVT_TIMER ,&wxWebSessionCURL::TimeoutNotification, this); + Bind(wxEVT_TIMER, &wxWebSessionCURL::TimeoutNotification, this); Bind(wxSocketAction, &wxWebSessionCURL::ProcessSocketPollerResult, this); } @@ -926,7 +926,7 @@ wxWebSessionCURL::CreateRequest(wxWebSession& session, else { curl_multi_setopt(m_handle, CURLMOPT_SOCKETDATA, this); - curl_multi_setopt(m_handle, CURLMOPT_SOCKETFUNCTION,SocketCallback); + curl_multi_setopt(m_handle, CURLMOPT_SOCKETFUNCTION, SocketCallback); curl_multi_setopt(m_handle, CURLMOPT_TIMERDATA, this); curl_multi_setopt(m_handle, CURLMOPT_TIMERFUNCTION, TimerCallback); } @@ -1030,7 +1030,7 @@ void wxWebSessionCURL::TimeoutNotification(wxTimerEvent& WXUNUSED(event)) void wxWebSessionCURL::ProcessTimeoutNotification() { int runningHandles; - curl_multi_socket_action(m_handle, CURL_SOCKET_TIMEOUT, 0,&runningHandles); + curl_multi_socket_action(m_handle, CURL_SOCKET_TIMEOUT, 0, &runningHandles); CheckForCompletedTransfers(); } From f406fcd12ac0701902de7922866c20d10b0e981a Mon Sep 17 00:00:00 2001 From: New Pagodi Date: Sat, 6 Feb 2021 22:57:38 -0600 Subject: [PATCH 03/12] Fix compilation of SocketPoller class on windows --- src/common/webrequest_curl.cpp | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/common/webrequest_curl.cpp b/src/common/webrequest_curl.cpp index ed484096c8..30bcc269b4 100644 --- a/src/common/webrequest_curl.cpp +++ b/src/common/webrequest_curl.cpp @@ -28,7 +28,7 @@ #include "wx/hashset.h" #include "wx/hashmap.h" -#ifdef __WXMSW__ +#ifdef __WINDOWS__ #include #else #include @@ -403,14 +403,7 @@ public: void ResumePolling(wxSOCKET_T); private: - enum - { -#ifdef INVALID_SOCKET - SOCKET_POLLER_INVALID_SOCKET = INVALID_SOCKET -#else - SOCKET_POLLER_INVALID_SOCKET = ((wxSOCKET_T)(~0)) -#endif - }; + static const wxSOCKET_T SOCKET_POLLER_INVALID_SOCKET; class Message { @@ -424,7 +417,7 @@ private: LastMessageId }; - Message(MessageId i = LastMessageId, + explicit Message(MessageId i = LastMessageId, wxSOCKET_T s = SOCKET_POLLER_INVALID_SOCKET, int f = 0, wxEvtHandler* h = NULL) @@ -486,10 +479,18 @@ private: wxDEFINE_EVENT(wxSocketAction,wxThreadEvent); +#ifdef INVALID_SOCKET + const wxSOCKET_T SocketPoller::SOCKET_POLLER_INVALID_SOCKET = + INVALID_SOCKET; +#else + const wxSOCKET_T SocketPoller::SOCKET_POLLER_INVALID_SOCKET = + static_cast(~0); +#endif + SocketPoller::SocketPoller() { - m_writeEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; + m_writeEnd = static_cast(SocketPoller::SOCKET_POLLER_INVALID_SOCKET); m_readEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; CreateSocketPair(); @@ -516,14 +517,14 @@ SocketPoller::~SocketPoller() CloseSocket(m_writeEnd); CloseSocket(m_readEnd); -#ifdef __WXMSW__ +#ifdef __WINDOWS__ WSACleanup(); #endif } void SocketPoller::CreateSocketPair() { -#ifdef __WXMSW__ +#ifdef __WINDOWS__ WORD wVersionRequested = MAKEWORD(1,1); WSADATA wsaData; WSAStartup(wVersionRequested, &wsaData); @@ -586,7 +587,7 @@ void SocketPoller::CreateSocketPair() void SocketPoller::CloseSocket(wxSOCKET_T s) { -#ifdef __WXMSW__ +#ifdef __WINDOWS__ closesocket(s); #else close(s); From 3e5fd5462de0f6bf1e65180799714907238bc905 Mon Sep 17 00:00:00 2001 From: New Pagodi Date: Sun, 7 Feb 2021 20:37:47 -0600 Subject: [PATCH 04/12] Add a progress callback for wxWebRequestCURL This commit adds a progress callback for use with wxWebRequestCURL objects. This has some complications because over the years curl has changed the signature of the callback. A combination of compile-time and run-time checks is used to make sure the appropriate callback and preferred return value are used. --- include/wx/private/webrequest_curl.h | 5 +++ src/common/webrequest_curl.cpp | 67 +++++++++++++++++++++++++--- 2 files changed, 66 insertions(+), 6 deletions(-) diff --git a/include/wx/private/webrequest_curl.h b/include/wx/private/webrequest_curl.h index b0aa565584..d0e2f8a5d7 100644 --- a/include/wx/private/webrequest_curl.h +++ b/include/wx/private/webrequest_curl.h @@ -114,6 +114,7 @@ public: // Methods called from libcurl callbacks size_t CURLOnWrite(void *buffer, size_t size); size_t CURLOnHeader(const char* buffer, size_t size); + int CURLOnProgress(); private: wxWebRequestHeaderMap m_headers; @@ -149,6 +150,9 @@ public: void CancelRequest(wxWebRequestCURL* request); + static bool CurlRuntimeAtLeastVersion(unsigned int, unsigned int, + unsigned int); + private: static int TimerCallback(CURLM*, long, void*); static int SocketCallback(CURL*, curl_socket_t, int, void*, void*); @@ -165,6 +169,7 @@ private: CURLM* m_handle; static int ms_activeSessions; + static unsigned int ms_runtimeVersion; wxDECLARE_NO_COPY_CLASS(wxWebSessionCURL); }; diff --git a/src/common/webrequest_curl.cpp b/src/common/webrequest_curl.cpp index 30bcc269b4..d247e844af 100644 --- a/src/common/webrequest_curl.cpp +++ b/src/common/webrequest_curl.cpp @@ -44,12 +44,6 @@ (LIBCURL_VERSION_NUM >= CURL_VERSION_BITS(x, y, z)) #endif -#if CURL_AT_LEAST_VERSION(7, 28, 0) - #define wxCURL_HAVE_MULTI_WAIT 1 -#else - #define wxCURL_HAVE_MULTI_WAIT 0 -#endif - // The new name was introduced in curl 7.21.6. #ifndef CURLOPT_ACCEPT_ENCODING #define CURLOPT_ACCEPT_ENCODING CURLOPT_ENCODING @@ -73,12 +67,53 @@ static size_t wxCURLHeader(char *buffer, size_t size, size_t nitems, void *userd return static_cast(userdata)->CURLOnHeader(buffer, size * nitems); } +int wxCURLXferInfo(void* clientp, curl_off_t WXUNUSED(dltotal), + curl_off_t WXUNUSED(dlnow), + curl_off_t WXUNUSED(ultotal), + curl_off_t WXUNUSED(ulnow)) +{ + wxCHECK_MSG( clientp, 0, "invalid curl progress callback data" ); + + wxWebResponseCURL* response = reinterpret_cast(clientp); + return response->CURLOnProgress(); +} + +int wxCURLProgress(void* clientp, double dltotal, double dlnow, double ultotal, + double ulnow) +{ + return wxCURLXferInfo(clientp, static_cast(dltotal), + static_cast(dlnow), + static_cast(ultotal), + static_cast(ulnow)); +} + wxWebResponseCURL::wxWebResponseCURL(wxWebRequestCURL& request) : wxWebResponseImpl(request) { curl_easy_setopt(GetHandle(), CURLOPT_WRITEDATA, static_cast(this)); curl_easy_setopt(GetHandle(), CURLOPT_HEADERDATA, static_cast(this)); + // Set the progress callback. + #if CURL_AT_LEAST_VERSION(7, 32, 0) + if ( wxWebSessionCURL::CurlRuntimeAtLeastVersion(7, 32, 0) ) + { + curl_easy_setopt(GetHandle(), CURLOPT_XFERINFOFUNCTION, + wxCURLXferInfo); + curl_easy_setopt(GetHandle(), CURLOPT_XFERINFODATA, + static_cast(this)); + } + else + #endif + { + curl_easy_setopt(GetHandle(), CURLOPT_PROGRESSFUNCTION, + wxCURLProgress); + curl_easy_setopt(GetHandle(), CURLOPT_PROGRESSDATA, + static_cast(this)); + } + + // Have curl call the progress callback. + curl_easy_setopt(GetHandle(), CURLOPT_NOPROGRESS, 0L); + Init(); } @@ -117,6 +152,11 @@ size_t wxWebResponseCURL::CURLOnHeader(const char * buffer, size_t size) return size; } +int wxWebResponseCURL::CURLOnProgress() +{ + return 0; +} + wxFileOffset wxWebResponseCURL::GetContentLength() const { #if CURL_AT_LEAST_VERSION(7, 55, 0) @@ -877,6 +917,7 @@ void SocketPoller::ThreadCheckSockets() // int wxWebSessionCURL::ms_activeSessions = 0; +unsigned int wxWebSessionCURL::ms_runtimeVersion = 0; wxWebSessionCURL::wxWebSessionCURL() : m_handle(NULL) @@ -885,7 +926,14 @@ wxWebSessionCURL::wxWebSessionCURL() : if ( ms_activeSessions == 0 ) { if ( curl_global_init(CURL_GLOBAL_ALL) ) + { wxLogError(_("libcurl could not be initialized")); + } + else + { + curl_version_info_data* data = curl_version_info(CURLVERSION_NOW); + ms_runtimeVersion = data->version_num; + } } ms_activeSessions++; @@ -970,6 +1018,13 @@ wxVersionInfo wxWebSessionCURL::GetLibraryVersionInfo() desc); } +bool wxWebSessionCURL::CurlRuntimeAtLeastVersion(unsigned int major, + unsigned int minor, + unsigned int patch) +{ + return (ms_runtimeVersion >= CURL_VERSION_BITS(major, minor, patch)); +} + // curl interacts with the wxWebSessionCURL class through 2 callback functions // 1) TimerCallback is called whenever curl wants us to start or stop a timer. // 2) SocketCallback is called when curl wants us to start monitoring a socket From 48cba4f88e7299899d0ed44929aff8d259aec6a7 Mon Sep 17 00:00:00 2001 From: New Pagodi Date: Sun, 7 Feb 2021 20:43:51 -0600 Subject: [PATCH 05/12] Set wxWebResponseCURL buffer size as soon as known Currently for wxWebRequestCURL objects using memory storage, a download is processed by appending to a wxMemoryBuffer each time the write callback is called. For a large transfer, this can result in many, many reallocation calls and can block the main application. This commit uses the progress callback for wxWebRequestCURL objects added in a previous commit to set a minimum size for the buffer as soon as it is known. --- include/wx/private/webrequest.h | 4 ++++ include/wx/private/webrequest_curl.h | 3 ++- src/common/webrequest.cpp | 5 +++++ src/common/webrequest_curl.cpp | 17 ++++++++++++++--- 4 files changed, 25 insertions(+), 4 deletions(-) diff --git a/include/wx/private/webrequest.h b/include/wx/private/webrequest.h index 0812304fac..497bbfc170 100644 --- a/include/wx/private/webrequest.h +++ b/include/wx/private/webrequest.h @@ -187,6 +187,10 @@ protected: void ReportDataReceived(size_t sizeReceived); + // This function can optionally be called to preallocate the read buffer, + // if the total amount of data to be downloaded is known in advance. + void PreAllocBuffer(size_t sizeNeeded); + private: // Called by wxWebRequestImpl only. friend class wxWebRequestImpl; diff --git a/include/wx/private/webrequest_curl.h b/include/wx/private/webrequest_curl.h index d0e2f8a5d7..25b90a80ba 100644 --- a/include/wx/private/webrequest_curl.h +++ b/include/wx/private/webrequest_curl.h @@ -114,11 +114,12 @@ public: // Methods called from libcurl callbacks size_t CURLOnWrite(void *buffer, size_t size); size_t CURLOnHeader(const char* buffer, size_t size); - int CURLOnProgress(); + int CURLOnProgress(curl_off_t); private: wxWebRequestHeaderMap m_headers; wxString m_statusText; + wxFileOffset m_knownDownloadSize; CURL* GetHandle() const { return static_cast(m_request).GetHandle(); } diff --git a/src/common/webrequest.cpp b/src/common/webrequest.cpp index 351a4b76ae..70dbf7232b 100644 --- a/src/common/webrequest.cpp +++ b/src/common/webrequest.cpp @@ -688,6 +688,11 @@ void* wxWebResponseImpl::GetDataBuffer(size_t sizeNeeded) return m_readBuffer.GetAppendBuf(sizeNeeded); } +void wxWebResponseImpl::PreAllocBuffer(size_t sizeNeeded) +{ + m_readBuffer.SetBufSize(sizeNeeded); +} + void wxWebResponseImpl::ReportDataReceived(size_t sizeReceived) { m_readBuffer.UngetAppendBuf(sizeReceived); diff --git a/src/common/webrequest_curl.cpp b/src/common/webrequest_curl.cpp index d247e844af..f8d2aac47d 100644 --- a/src/common/webrequest_curl.cpp +++ b/src/common/webrequest_curl.cpp @@ -67,7 +67,7 @@ static size_t wxCURLHeader(char *buffer, size_t size, size_t nitems, void *userd return static_cast(userdata)->CURLOnHeader(buffer, size * nitems); } -int wxCURLXferInfo(void* clientp, curl_off_t WXUNUSED(dltotal), +int wxCURLXferInfo(void* clientp, curl_off_t dltotal, curl_off_t WXUNUSED(dlnow), curl_off_t WXUNUSED(ultotal), curl_off_t WXUNUSED(ulnow)) @@ -75,7 +75,7 @@ int wxCURLXferInfo(void* clientp, curl_off_t WXUNUSED(dltotal), wxCHECK_MSG( clientp, 0, "invalid curl progress callback data" ); wxWebResponseCURL* response = reinterpret_cast(clientp); - return response->CURLOnProgress(); + return response->CURLOnProgress(dltotal); } int wxCURLProgress(void* clientp, double dltotal, double dlnow, double ultotal, @@ -90,6 +90,8 @@ int wxCURLProgress(void* clientp, double dltotal, double dlnow, double ultotal, wxWebResponseCURL::wxWebResponseCURL(wxWebRequestCURL& request) : wxWebResponseImpl(request) { + m_knownDownloadSize = 0; + curl_easy_setopt(GetHandle(), CURLOPT_WRITEDATA, static_cast(this)); curl_easy_setopt(GetHandle(), CURLOPT_HEADERDATA, static_cast(this)); @@ -152,8 +154,17 @@ size_t wxWebResponseCURL::CURLOnHeader(const char * buffer, size_t size) return size; } -int wxWebResponseCURL::CURLOnProgress() +int wxWebResponseCURL::CURLOnProgress(curl_off_t total) { + if ( m_knownDownloadSize != total ) + { + if ( m_request.GetStorage() == wxWebRequest::Storage_Memory ) + { + PreAllocBuffer(static_cast(total)); + } + m_knownDownloadSize = total; + } + return 0; } From b0b53e8db3d06ed36cef31ec9c6ec90105ef88fc Mon Sep 17 00:00:00 2001 From: New Pagodi Date: Sun, 7 Feb 2021 20:48:12 -0600 Subject: [PATCH 06/12] Check wxWebSessionCURL::StartRequest really starts --- src/common/webrequest_curl.cpp | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/common/webrequest_curl.cpp b/src/common/webrequest_curl.cpp index f8d2aac47d..417541b9cf 100644 --- a/src/common/webrequest_curl.cpp +++ b/src/common/webrequest_curl.cpp @@ -999,15 +999,23 @@ bool wxWebSessionCURL::StartRequest(wxWebRequestCURL & request) { // Add request easy handle to multi handle CURL* curl = request.GetHandle(); - curl_multi_add_handle(m_handle, curl); + int code = curl_multi_add_handle(m_handle, curl); - request.SetState(wxWebRequest::State_Active); + if ( code == CURLM_OK ) + { + request.SetState(wxWebRequest::State_Active); - // Report a timeout to curl to initiate this transfer. - int runningHandles; - curl_multi_socket_action(m_handle, CURL_SOCKET_TIMEOUT, 0, &runningHandles); + // Report a timeout to curl to initiate this transfer. + int runningHandles; + curl_multi_socket_action(m_handle, CURL_SOCKET_TIMEOUT, 0, + &runningHandles); - return true; + return true; + } + else + { + return false; + } } void wxWebSessionCURL::CancelRequest(wxWebRequestCURL* request) From 774a752bed88e7d1fdc6245cb031acf042f828e8 Mon Sep 17 00:00:00 2001 From: New Pagodi Date: Sun, 7 Feb 2021 00:48:02 -0600 Subject: [PATCH 07/12] Track active transfers in wxWebSessionCURL class MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit At various points in a transfer being managed by the wxWebSessionCURL class we need to perform operations on a wxWebRequestCURL object. However it’s possible for the request object to be deleted while the transfer is in progress. To ensure that the request objects are valid, keep track of the request objects with a hash map. Objects are added to the map when a transfer is started and removed when the transfer is complete or in the request’s destructor. --- include/wx/private/webrequest_curl.h | 8 +++++ src/common/webrequest_curl.cpp | 49 +++++++++++++++++++++++----- 2 files changed, 48 insertions(+), 9 deletions(-) diff --git a/include/wx/private/webrequest_curl.h b/include/wx/private/webrequest_curl.h index 25b90a80ba..dd1e4bf4ea 100644 --- a/include/wx/private/webrequest_curl.h +++ b/include/wx/private/webrequest_curl.h @@ -17,6 +17,7 @@ #include "wx/thread.h" #include "wx/vector.h" #include "wx/timer.h" +#include "wx/hashmap.h" #include "curl/curl.h" @@ -151,6 +152,8 @@ public: void CancelRequest(wxWebRequestCURL* request); + void RequestHasTerminated(wxWebRequestCURL* request); + static bool CurlRuntimeAtLeastVersion(unsigned int, unsigned int, unsigned int); @@ -165,6 +168,11 @@ private: void ProcessSocketPollerResult(wxThreadEvent&); void CheckForCompletedTransfers(); + WX_DECLARE_HASH_MAP(CURL*, wxWebRequestCURL*, wxPointerHash, \ + wxPointerEqual, TransferSet); + + TransferSet m_activeTransfers; + SocketPoller* m_socketPoller; wxTimer m_timeoutTimer; CURLM* m_handle; diff --git a/src/common/webrequest_curl.cpp b/src/common/webrequest_curl.cpp index 417541b9cf..d608192cc8 100644 --- a/src/common/webrequest_curl.cpp +++ b/src/common/webrequest_curl.cpp @@ -240,8 +240,6 @@ wxWebRequestCURL::wxWebRequestCURL(wxWebSession & session, // Set error buffer to get more detailed CURL status m_errorBuffer[0] = '\0'; curl_easy_setopt(m_handle, CURLOPT_ERRORBUFFER, m_errorBuffer); - // Set this request in the private pointer - curl_easy_setopt(m_handle, CURLOPT_PRIVATE, static_cast(this)); // Set URL to handle: note that we must use wxURI to escape characters not // allowed in the URLs correctly (URL API is only available in libcurl // since the relatively recent v7.62.0, so we don't want to rely on it). @@ -266,8 +264,7 @@ wxWebRequestCURL::wxWebRequestCURL(wxWebSession & session, wxWebRequestCURL::~wxWebRequestCURL() { DestroyHeaderList(); - - curl_easy_cleanup(m_handle); + m_sessionImpl.RequestHasTerminated(this); } void wxWebRequestCURL::Start() @@ -1004,6 +1001,7 @@ bool wxWebSessionCURL::StartRequest(wxWebRequestCURL & request) if ( code == CURLM_OK ) { request.SetState(wxWebRequest::State_Active); + m_activeTransfers[curl] = &request; // Report a timeout to curl to initiate this transfer. int runningHandles; @@ -1020,10 +1018,37 @@ bool wxWebSessionCURL::StartRequest(wxWebRequestCURL & request) void wxWebSessionCURL::CancelRequest(wxWebRequestCURL* request) { - curl_multi_remove_handle(m_handle, request->GetHandle()); + CURL* curl = request->GetHandle(); + TransferSet::iterator it = m_activeTransfers.find(curl); + + if ( it != m_activeTransfers.end() ) + { + m_activeTransfers.erase(it); + } + + curl_multi_remove_handle(m_handle, curl); request->SetState(wxWebRequest::State_Cancelled); } +void wxWebSessionCURL::RequestHasTerminated(wxWebRequestCURL* request) +{ + CURL* curl = request->GetHandle(); + TransferSet::iterator it = m_activeTransfers.find(curl); + + if ( it != m_activeTransfers.end() ) + { + // The transfer the CURL handle is performing is still in progress, but + // the web request object it belongs to is being deleted. Since the + // next step will call curl_easy_cleanup and any calls on the CURL + // handle after cleanup are illegal, remove it from the CURLM + // multihandle now. + curl_multi_remove_handle(m_handle, curl); + m_activeTransfers.erase(it); + } + + curl_easy_cleanup(curl); +} + wxVersionInfo wxWebSessionCURL::GetLibraryVersionInfo() { const curl_version_info_data* vi = curl_version_info(CURLVERSION_NOW); @@ -1196,10 +1221,16 @@ void wxWebSessionCURL::CheckForCompletedTransfers() { if ( msg->msg == CURLMSG_DONE ) { - wxWebRequestCURL* request; - curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &request); - curl_multi_remove_handle(m_handle, msg->easy_handle); - request->HandleCompletion(); + CURL* curl = msg->easy_handle; + TransferSet::iterator it = m_activeTransfers.find(curl); + + if ( it != m_activeTransfers.end() ) + { + wxWebRequestCURL* request = it->second; + curl_multi_remove_handle(m_handle, curl); + request->HandleCompletion(); + m_activeTransfers.erase(it); + } } } } From 88dca37b3f2ff8572dbbd3e764f0cb2e191c1cf1 Mon Sep 17 00:00:00 2001 From: New Pagodi Date: Sun, 7 Feb 2021 16:34:07 -0600 Subject: [PATCH 08/12] Change cancel method for wxWebRequestCURL objects MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously wxWebRequestCURL objects were canceled by removing their CURL easy handle from the CURLM multihandle. Unfortunately the really only pauses the connection and does not truly cancel it. This commit tries to stop the transfer by retrieving the active socket from the CURL handle for the transfer and closing it. There are some complications in doing this because the option curl uses to get the socket have changed over the years. A combination of compile time and run time checks are used to use the appropriate options to get the socket. However in the case of 64bit windows using a curl version older than 7.45.0 simply won’t have an usable option. In this case, it seems nothing can be done. --- include/wx/private/webrequest_curl.h | 1 + src/common/webrequest_curl.cpp | 65 ++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/include/wx/private/webrequest_curl.h b/include/wx/private/webrequest_curl.h index dd1e4bf4ea..b106cef287 100644 --- a/include/wx/private/webrequest_curl.h +++ b/include/wx/private/webrequest_curl.h @@ -167,6 +167,7 @@ private: void ProcessSocketCallback(curl_socket_t, int); void ProcessSocketPollerResult(wxThreadEvent&); void CheckForCompletedTransfers(); + void StopTransfer(CURL*); WX_DECLARE_HASH_MAP(CURL*, wxWebRequestCURL*, wxPointerHash, \ wxPointerEqual, TransferSet); diff --git a/src/common/webrequest_curl.cpp b/src/common/webrequest_curl.cpp index d608192cc8..3b98f9e992 100644 --- a/src/common/webrequest_curl.cpp +++ b/src/common/webrequest_curl.cpp @@ -1027,6 +1027,8 @@ void wxWebSessionCURL::CancelRequest(wxWebRequestCURL* request) } curl_multi_remove_handle(m_handle, curl); + StopTransfer(curl); + request->SetState(wxWebRequest::State_Cancelled); } @@ -1235,4 +1237,67 @@ void wxWebSessionCURL::CheckForCompletedTransfers() } } +void wxWebSessionCURL::StopTransfer(CURL* curl) +{ + curl_socket_t activeSocket; + bool closeActiveSocket = true; + bool useLastSocket = false; + +#if CURL_AT_LEAST_VERSION(7, 45, 0) + if ( CurlRuntimeAtLeastVersion(7, 45, 0) ) + { + CURLcode code = curl_easy_getinfo(curl, CURLINFO_ACTIVESOCKET, + &activeSocket); + + if ( code != CURLE_OK || activeSocket == CURL_SOCKET_BAD ) + { + closeActiveSocket = false; + } + } + else + { + useLastSocket = true; + } +#else + useLastSocket = true; +#endif //CURL_AT_LEAST_VERSION(7, 45, 0) + + // CURLINFO_ACTIVESOCKET is not available either at compile time or run + // time. So we must use the older CURLINFO_LASTSOCKET instead. + if ( useLastSocket ) + { + #ifdef __WIN64__ + // CURLINFO_LASTSOCKET won't work on 64 bit windows because it + // uses a long to retrive the socket. However sockets will be 64 + // bit values. In this case there is nothing we can do. + closeActiveSocket = false; + #endif //__WIN64__ + + if ( closeActiveSocket ) + { + long longSocket; + CURLcode code = curl_easy_getinfo(curl, CURLINFO_LASTSOCKET, + &longSocket); + + if ( code == CURLE_OK && longSocket!= -1 ) + { + activeSocket = static_cast(longSocket); + } + else + { + closeActiveSocket = false; + } + } + } + + if ( closeActiveSocket ) + { + #ifdef __WINDOWS__ + closesocket(activeSocket); + #else + close(activeSocket); + #endif + } +} + #endif // wxUSE_WEBREQUEST_CURL From 252a920bd7e9e4a6c96440c37ecf9a579c25b98d Mon Sep 17 00:00:00 2001 From: New Pagodi Date: Sun, 7 Feb 2021 20:55:24 -0600 Subject: [PATCH 09/12] Use main thread for the socket poller if possible On windows and systems where wxUSE_EVENTLOOP_SOURCE is 1, it is possible to monitor socket descriptors for activity from the main thread. The SocketPoller class used with wxWebSessionCURL is modified to use an implementation class that does so. On windows, the implementation uses the winsock1 function WSAAsyncSelect to send events to wxWebSessionCURL when activity is detected. When wxUSE_EVENTLOOP_SOURCE is 1, a wxEventLoopSource is used to monitor for socket activity. The event loop source is given a custom wxEventLoopSourceHandler object to send the necessary event. --- src/common/webrequest_curl.cpp | 778 +++++++++++++++------------------ 1 file changed, 363 insertions(+), 415 deletions(-) diff --git a/src/common/webrequest_curl.cpp b/src/common/webrequest_curl.cpp index 3b98f9e992..be444d4f0e 100644 --- a/src/common/webrequest_curl.cpp +++ b/src/common/webrequest_curl.cpp @@ -24,19 +24,17 @@ #include "wx/uri.h" #include "wx/socket.h" -#include "wx/msgqueue.h" -#include "wx/hashset.h" -#include "wx/hashmap.h" +#include "wx/evtloop.h" #ifdef __WINDOWS__ - #include + #include "wx/hashset.h" + #include "wx/msw/wrapwin.h" #else - #include - #include + #include "wx/evtloopsrc.h" + #include "wx/evtloop.h" #endif - // Define symbols that might be missing from older libcurl headers #ifndef CURL_AT_LEAST_VERSION #define CURL_VERSION_BITS(x,y,z) ((x)<<16|(y)<<8|z) @@ -422,503 +420,451 @@ void wxWebAuthChallengeCURL::SetCredentials(const wxWebCredentials& cred) m_request.StartRequest(); } +// +// SocketPoller - a helper class for wxWebSessionCURL +// -wxDECLARE_EVENT(wxSocketAction,wxThreadEvent); +wxDECLARE_EVENT(wxEVT_SOCKET_POLLER_RESULT, wxThreadEvent); -class SocketPoller: public wxThreadHelper +class SocketPollerImpl; + +class SocketPoller { public: enum PollAction { - InvalidAction = 0, - PollForRead = 1, - PollForWrite = 2, - PollForError = 4 + INVALID_ACTION = 0x00, + POLL_FOR_READ = 0x01, + POLL_FOR_WRITE = 0x02 }; enum Result { - InvalidResult = 0, - ReadyForRead = 1, - ReadyForWrite = 2, - HasError = 4 + INVALID_RESULT = 0x00, + READY_FOR_READ = 0x01, + READY_FOR_WRITE = 0x02, + HAS_ERROR = 0x04 }; - SocketPoller(); + SocketPoller(wxEvtHandler*); ~SocketPoller(); - bool StartPolling(wxSOCKET_T, int, wxEvtHandler*); + bool StartPolling(wxSOCKET_T, int); void StopPolling(wxSOCKET_T); void ResumePolling(wxSOCKET_T); private: - static const wxSOCKET_T SOCKET_POLLER_INVALID_SOCKET; - - class Message - { - public: - enum MessageId - { - AddSocketAction, - DeleteSocketAction, - ResumePolling, - Quit, - LastMessageId - }; - - explicit Message(MessageId i = LastMessageId, - wxSOCKET_T s = SOCKET_POLLER_INVALID_SOCKET, - int f = 0, - wxEvtHandler* h = NULL) - { - m_messageId = i; - m_socket = s; - m_flags = f; - m_evtHandler = h; - } - - MessageId GetMessageId() const { return m_messageId; } - wxSOCKET_T GetSocket() const { return m_socket; } - int GetFlags() const { return m_flags; } - wxEvtHandler* GetEvtHandler() const { return m_evtHandler; } - - private: - wxSOCKET_T m_socket; - int m_flags; - MessageId m_messageId; - wxEvtHandler* m_evtHandler; - }; - - struct SocketData - { - int m_pollAction; - void* m_event; - wxEvtHandler* m_handler; - }; - - WX_DECLARE_HASH_SET(wxSOCKET_T, wxIntegerHash, wxIntegerEqual, SocketSet ); - WX_DECLARE_HASH_MAP(wxSOCKET_T, SocketData, wxIntegerHash, wxIntegerEqual, \ - SocketDataMap ); - - // Items used from main thread. - void CreateSocketPair(); - void CloseSocket(wxSOCKET_T); - void PostAndSignal(const Message&); - - SocketSet m_polledSockets; - wxSOCKET_T m_writeEnd; - - - // Items used in both threads. - wxMessageQueue m_msgQueue; - wxSOCKET_T m_readEnd; - - - // Items used from worker thread. - virtual wxThread::ExitCode Entry() wxOVERRIDE; - - void ThreadRemoveFromDL(wxSOCKET_T); - void ThreadSetSocketAction(wxSOCKET_T, int,wxEvtHandler*); - void ThreadDeleteSocketAction(wxSOCKET_T); - void ThreadCheckSockets(); - - SocketDataMap m_socketData; - SocketSet m_disabledList; + SocketPollerImpl* m_impl; }; -wxDEFINE_EVENT(wxSocketAction,wxThreadEvent); +wxDEFINE_EVENT(wxEVT_SOCKET_POLLER_RESULT, wxThreadEvent); -#ifdef INVALID_SOCKET - const wxSOCKET_T SocketPoller::SOCKET_POLLER_INVALID_SOCKET = - INVALID_SOCKET; -#else - const wxSOCKET_T SocketPoller::SOCKET_POLLER_INVALID_SOCKET = - static_cast(~0); -#endif - - -SocketPoller::SocketPoller() +class SocketPollerImpl { - m_writeEnd = static_cast(SocketPoller::SOCKET_POLLER_INVALID_SOCKET); - m_readEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; - CreateSocketPair(); +public: + virtual ~SocketPollerImpl(){}; + virtual bool StartPolling(wxSOCKET_T, int) = 0; + virtual void StopPolling(wxSOCKET_T) = 0; + virtual void ResumePolling(wxSOCKET_T) = 0; - if (CreateThread(wxTHREAD_JOINABLE) != wxTHREAD_NO_ERROR) - { - wxLogDebug("Could not create socket poller worker thread!"); - return; - } + static SocketPollerImpl* Create(wxEvtHandler*); +}; - if (GetThread()->Run() != wxTHREAD_NO_ERROR) - { - wxLogDebug("Could not run socket poller worker thread!"); - return; - } +SocketPoller::SocketPoller(wxEvtHandler* hndlr) +{ + m_impl = SocketPollerImpl::Create(hndlr); } SocketPoller::~SocketPoller() { - SocketPoller::Message msg(Message::Quit); - PostAndSignal(msg); - - GetThread()->Wait(); - - CloseSocket(m_writeEnd); - CloseSocket(m_readEnd); - -#ifdef __WINDOWS__ - WSACleanup(); -#endif + delete m_impl; } -void SocketPoller::CreateSocketPair() +bool SocketPoller::StartPolling(wxSOCKET_T sock, int pollAction) { + return m_impl->StartPolling(sock, pollAction); +} +void SocketPoller::StopPolling(wxSOCKET_T sock) +{ + m_impl->StopPolling(sock); +} + +void SocketPoller::ResumePolling(wxSOCKET_T sock) +{ + m_impl->ResumePolling(sock); +} + #ifdef __WINDOWS__ + +class WinSock1SocketPoller: public SocketPollerImpl +{ +public: + WinSock1SocketPoller(wxEvtHandler*); + virtual ~WinSock1SocketPoller(); + virtual bool StartPolling(wxSOCKET_T, int) wxOVERRIDE; + virtual void StopPolling(wxSOCKET_T) wxOVERRIDE; + virtual void ResumePolling(wxSOCKET_T) wxOVERRIDE; + +private: + static LRESULT CALLBACK MsgProc(HWND hwnd, WXUINT uMsg, WXWPARAM wParam, + WXLPARAM lParam); + static const WXUINT SOCKET_MESSAGE; + + WX_DECLARE_HASH_SET(wxSOCKET_T, wxIntegerHash, wxIntegerEqual, SocketSet); + + SocketSet m_polledSockets; + WXHWND m_hwnd; +}; + +const WXUINT WinSock1SocketPoller::SOCKET_MESSAGE = WM_USER + 1; + +WinSock1SocketPoller::WinSock1SocketPoller(wxEvtHandler* hndlr) +{ + // Initialize winsock in case it's not already done. WORD wVersionRequested = MAKEWORD(1,1); WSADATA wsaData; WSAStartup(wVersionRequested, &wsaData); - m_writeEnd = socket(AF_INET, SOCK_DGRAM, 0); + // Create a dummy message only window. + m_hwnd = CreateWindowEx( + 0, //DWORD dwExStyle, + TEXT("STATIC"), //LPCSTR lpClassName, + NULL, //LPCSTR lpWindowName, + 0, //DWORD dwStyle, + 0, //int X, + 0, //int Y, + 0, //int nWidth, + 0, //int nHeight, + HWND_MESSAGE, //HWND hWndParent, + NULL, //HMENU hMenu, + NULL, //HINSTANCE hInstance, + NULL //LPVOID lpParam + ); - if ( m_writeEnd == INVALID_SOCKET ) + if ( m_hwnd == NULL ) { - wxLogDebug("Unable to create write end of socket pair."); - m_writeEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; + wxLogError("Unable to create message window for WinSock1SocketPoller"); return; } - m_readEnd = socket(AF_INET, SOCK_DGRAM, 0); - - if ( m_readEnd == INVALID_SOCKET ) - { - wxLogDebug("Unable to create read end of socket pair."); - CloseSocket(m_writeEnd); - m_writeEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; - m_readEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; - return; - } - - // Bind the read end. This will assign it to an unused port. - struct sockaddr_in serverAddr; - serverAddr.sin_family = AF_INET; - serverAddr.sin_port = htons(0); - serverAddr.sin_addr.s_addr = inet_addr("127.0.0.1"); - - int retcode = bind(m_readEnd, (struct sockaddr *)&serverAddr, - sizeof(serverAddr)); - if ( retcode < 0 ) - { - wxLogDebug("Unable bind socket to port."); - CloseSocket(m_writeEnd); - m_writeEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; - CloseSocket(m_readEnd); - m_readEnd = SocketPoller::SOCKET_POLLER_INVALID_SOCKET; - return; - } - - // Get the ip address and port of the read end. - struct sockaddr_in readEndAddr; - ZeroMemory(&readEndAddr, sizeof(readEndAddr)); - int len = sizeof(readEndAddr); - getsockname(m_readEnd, (struct sockaddr *) &readEndAddr, &len); - - // Unlike with stream sockets, this is a synchronous operation and just - // sets location for the send function. - connect(m_writeEnd, (struct sockaddr *)&readEndAddr, sizeof(readEndAddr)); -#else - int fd[2]; - socketpair(AF_UNIX, SOCK_STREAM, 0, fd); - - m_writeEnd = fd[0]; - m_readEnd = fd[1]; -#endif + // Set the event handler to be the message window's user data. Also set the + // message window to use our MsgProc to process messages it receives. + SetWindowLongPtr(m_hwnd, GWLP_USERDATA, reinterpret_cast(hndlr)); + SetWindowLongPtr(m_hwnd, GWLP_WNDPROC, + reinterpret_cast(WinSock1SocketPoller::MsgProc)); } -void SocketPoller::CloseSocket(wxSOCKET_T s) +WinSock1SocketPoller::~WinSock1SocketPoller() { -#ifdef __WINDOWS__ - closesocket(s); -#else - close(s); -#endif + // Stop monitoring any leftover sockets. + for ( SocketSet::iterator it = m_polledSockets.begin() ; + it != m_polledSockets.end() ; ++it ) + { + WSAAsyncSelect(*it, m_hwnd, 0, 0); + } + + // Close the message window. + if ( m_hwnd ) + { + CloseWindow(m_hwnd); + } + + // Cleanup winsock. + WSACleanup(); } -bool SocketPoller::StartPolling(wxSOCKET_T s, int flags, wxEvtHandler* hndlr) +bool WinSock1SocketPoller::StartPolling(wxSOCKET_T sock, int pollAction) { - SocketSet::iterator it = m_polledSockets.find(s); + StopPolling(sock); - if ( m_polledSockets.size() > 1023 ) + // Convert pollAction to a flag that can be used by winsock. + int winActions = 0; + + if ( pollAction & SocketPoller::POLL_FOR_READ ) { - if ( it == m_polledSockets.end() ) - { - return false; - } + winActions |= FD_READ; } - if ( it == m_polledSockets.end() ) + if ( pollAction & SocketPoller::POLL_FOR_WRITE ) { - m_polledSockets.insert(s); + winActions |= FD_WRITE; } - SocketPoller::Message msg(Message::AddSocketAction, s, flags, hndlr); - PostAndSignal(msg); + // Have winsock send a message to our window whenever activity is + // detected on the socket. + WSAAsyncSelect(sock, m_hwnd, SOCKET_MESSAGE, winActions); + m_polledSockets.insert(sock); return true; } -void SocketPoller::StopPolling(wxSOCKET_T s) +void WinSock1SocketPoller::StopPolling(wxSOCKET_T sock) { - SocketPoller::Message msg(Message::DeleteSocketAction, s); - PostAndSignal(msg); - - SocketSet::iterator it = m_polledSockets.find(s); + SocketSet::iterator it = m_polledSockets.find(sock); if ( it != m_polledSockets.end() ) { + // Stop sending messages when there is activity on the socket. + WSAAsyncSelect(sock, m_hwnd, 0, 0); m_polledSockets.erase(it); } } -void SocketPoller::ResumePolling(wxSOCKET_T s) +void WinSock1SocketPoller::ResumePolling(wxSOCKET_T WXUNUSED(sock)) { - SocketPoller::Message msg(Message::ResumePolling, s); - PostAndSignal(msg); } -void SocketPoller::PostAndSignal(const Message& msg) +LRESULT CALLBACK WinSock1SocketPoller::MsgProc(WXHWND hwnd, WXUINT uMsg, + WXWPARAM wParam, WXLPARAM lParam) { - m_msgQueue.Post(msg); + // We only handle 1 message - the message we told winsock to send when + // it notices activity on sockets we are monitoring. - if ( m_writeEnd != SocketPoller::SOCKET_POLLER_INVALID_SOCKET ) + if ( uMsg == SOCKET_MESSAGE ) { - char c = 32; - send(m_writeEnd, &c, 1, 0); + // Extract the result any any errors from lParam. + int winResult = LOWORD(lParam); + int error = HIWORD(lParam); + + // Convert the result/errors to a SocketPoller::Result flag. + int pollResult = 0; + + if ( winResult & FD_READ ) + { + pollResult |= SocketPoller::READY_FOR_READ; + } + + if ( winResult & FD_WRITE ) + { + pollResult |= SocketPoller::READY_FOR_WRITE; + } + + if ( error != 0 ) + { + pollResult |= SocketPoller::HAS_ERROR; + } + + // If there is a significant result, send an event. + if ( pollResult != 0 ) + { + // The event handler is stored in the window's user data and the + // socket with activity is given by wParam. + LONG_PTR userData = GetWindowLongPtr(hwnd, GWLP_USERDATA); + wxEvtHandler* hndlr = reinterpret_cast(userData); + wxSOCKET_T sock = wParam; + + wxThreadEvent* event = + new wxThreadEvent(wxEVT_SOCKET_POLLER_RESULT); + event->SetPayload(sock); + event->SetInt(pollResult); + + if ( wxThread::IsMain() ) + { + hndlr->ProcessEvent(*event); + delete event; + } + else + { + wxQueueEvent(hndlr, event); + } + } + + return 0; + } + else + { + return DefWindowProc(hwnd, uMsg, wParam, lParam); } } -wxThread::ExitCode SocketPoller::Entry() +SocketPollerImpl* SocketPollerImpl::Create(wxEvtHandler* hndlr) { - wxMessageQueueError er; - SocketPoller::Message msg; - bool done = false; - size_t socketsUnderConsideration = 0; - int timeOut = 50; - Message::MessageId id; - - while ( !done ) - { - if ( GetThread()->TestDestroy() ) - { - break; - } - - // Process all messages in the message queue. If we currently have no - // sockets, wait a little for a message to come in. - timeOut = (socketsUnderConsideration == 0) ? 50 : 0; - er = wxMSGQUEUE_NO_ERROR; - - while ( er == wxMSGQUEUE_NO_ERROR ) - { - er = m_msgQueue.ReceiveTimeout(timeOut, msg); - id = msg.GetMessageId(); - - if ( er == wxMSGQUEUE_TIMEOUT ) - { - break; - } - else if ( er == wxMSGQUEUE_MISC_ERROR ) - { - wxLogDebug("Error with socket poller message queue."); - done = true; - break; - } - else if ( id == Message::Quit ) - { - done = true; - break; - } - else if ( id == Message::AddSocketAction ) - { - wxSOCKET_T s = msg.GetSocket(); - int f = msg.GetFlags(); - wxEvtHandler* hndlr = msg.GetEvtHandler(); - ThreadSetSocketAction(s, f, hndlr); - } - else if ( id == Message::DeleteSocketAction ) - { - wxSOCKET_T s = msg.GetSocket(); - ThreadDeleteSocketAction(s); - } - else if ( id == Message::ResumePolling ) - { - wxSOCKET_T s = msg.GetSocket(); - ThreadRemoveFromDL(s); - } - - timeOut = 0; - } - - socketsUnderConsideration = m_socketData.size() - m_disabledList.size(); - - if ( socketsUnderConsideration > 0 && !done ) - { - ThreadCheckSockets(); - } - } - - return static_cast(0); + return new WinSock1SocketPoller(hndlr); } -void SocketPoller::ThreadRemoveFromDL(wxSOCKET_T s) -{ - SocketSet::iterator it = m_disabledList.find(s); +#else - if ( it != m_disabledList.end() ) +// SocketPollerSourceHandler - a source handler used by the SocketPoller class. + +class SocketPollerSourceHandler: public wxEventLoopSourceHandler +{ +public: + SocketPollerSourceHandler(wxSOCKET_T, wxEvtHandler*); + + void OnReadWaiting() wxOVERRIDE; + void OnWriteWaiting() wxOVERRIDE; + void OnExceptionWaiting() wxOVERRIDE; + ~SocketPollerSourceHandler(){} +private: + void SendEvent(int); + wxSOCKET_T m_socket; + wxEvtHandler* m_handler; +}; + +SocketPollerSourceHandler::SocketPollerSourceHandler(wxSOCKET_T sock, + wxEvtHandler* hndlr) +{ + m_socket = sock; + m_handler = hndlr; +} + +void SocketPollerSourceHandler::OnReadWaiting() +{ + SendEvent(SocketPoller::READY_FOR_READ); +} + +void SocketPollerSourceHandler::OnWriteWaiting() +{ + SendEvent(SocketPoller::READY_FOR_WRITE); +} + +void SocketPollerSourceHandler::OnExceptionWaiting() +{ + SendEvent(SocketPoller::HAS_ERROR); +} + +void SocketPollerSourceHandler::SendEvent(int result) +{ + wxThreadEvent event(wxEVT_SOCKET_POLLER_RESULT); + event.SetPayload(m_socket); + event.SetInt(result); + m_handler->ProcessEvent(event); +} + +// SourceSocketPoller - a SocketPollerImpl based on event loop sources. + +class SourceSocketPoller: public SocketPollerImpl +{ +public: + SourceSocketPoller(wxEvtHandler*); + ~SourceSocketPoller(); + bool StartPolling(wxSOCKET_T, int) wxOVERRIDE; + void StopPolling(wxSOCKET_T) wxOVERRIDE; + void ResumePolling(wxSOCKET_T) wxOVERRIDE; + +private: + WX_DECLARE_HASH_MAP(wxSOCKET_T, wxEventLoopSource*, wxIntegerHash,\ + wxIntegerEqual, SocketDataMap); + + void CleanUpSocketSource(wxEventLoopSource*); + + SocketDataMap m_socketData; + wxEvtHandler* m_handler; +}; + +SourceSocketPoller::SourceSocketPoller(wxEvtHandler* hndlr) +{ + m_handler = hndlr; +} + +SourceSocketPoller::~SourceSocketPoller() +{ + // Clean up any leftover socket data. + for ( SocketDataMap::iterator it = m_socketData.begin() ; + it != m_socketData.end() ; ++it ) { - m_disabledList.erase(it); + CleanUpSocketSource(it->second); } } -void SocketPoller::ThreadDeleteSocketAction(wxSOCKET_T sock) +static int SocketPoller2EventSource(int pollAction) { - ThreadRemoveFromDL(sock); + // Convert the SocketPoller::PollAction value to a flag that can be used + // by wxEventLoopSource. + // Always check for errors. + int eventSourceFlag = wxEVENT_SOURCE_EXCEPTION; + + if ( pollAction & SocketPoller::POLL_FOR_READ ) + { + eventSourceFlag |= wxEVENT_SOURCE_INPUT; + } + + if ( pollAction & SocketPoller::POLL_FOR_WRITE ) + { + eventSourceFlag |= wxEVENT_SOURCE_OUTPUT; + } + + return eventSourceFlag; +} + +bool SourceSocketPoller::StartPolling(wxSOCKET_T sock, int pollAction) +{ + SocketDataMap::iterator it = m_socketData.find(sock); + wxEventLoopSourceHandler* srcHandler = NULL; + + if ( it != m_socketData.end() ) + { + // If this socket is already being polled, reuse the old handler. Also + // delete the old source object to stop the old polling operations. + wxEventLoopSource* oldSrc = it->second; + srcHandler = oldSrc->GetHandler(); + + delete oldSrc; + } + else + { + // Otherwise create a new source handler. + srcHandler = + new SocketPollerSourceHandler(sock, m_handler); + } + + // Get a new source object for these polling checks. + bool socketIsPolled = true; + int eventSourceFlag = SocketPoller2EventSource(pollAction); + wxEventLoopSource* newSrc = + wxEventLoopBase::AddSourceForFD(sock, srcHandler, eventSourceFlag); + + if ( newSrc == NULL ) + { + // We were not able to add a source for this socket. + wxLogDebug(wxString::Format( + "Unable to create event loop source for %d", + static_cast(sock))); + + delete srcHandler; + socketIsPolled = false; + + if ( it != m_socketData.end() ) + { + m_socketData.erase(it); + } + } + else + { + m_socketData[sock] = newSrc; + } + + return socketIsPolled; +} + +void SourceSocketPoller::StopPolling(wxSOCKET_T sock) +{ SocketDataMap::iterator it = m_socketData.find(sock); if ( it != m_socketData.end() ) { + CleanUpSocketSource(it->second); m_socketData.erase(it); } } -void SocketPoller::ThreadSetSocketAction(wxSOCKET_T sock, int flags, - wxEvtHandler* hndlr) +void SourceSocketPoller::ResumePolling(wxSOCKET_T WXUNUSED(sock)) { - ThreadDeleteSocketAction(sock); - - SocketData data; - data.m_pollAction = flags; - data.m_event = NULL; - data.m_handler = hndlr; - - m_socketData[sock] = data; } -void SocketPoller::ThreadCheckSockets() +void SourceSocketPoller::CleanUpSocketSource(wxEventLoopSource* source) { - fd_set readFds, writeFds, errorFds; - FD_ZERO(&readFds); - FD_ZERO(&writeFds); - FD_ZERO(&errorFds); - - wxSOCKET_T maxSd = 0; - - if ( m_readEnd != SocketPoller::SOCKET_POLLER_INVALID_SOCKET ) - { - FD_SET(m_readEnd, &readFds); - maxSd = m_readEnd; - } - - for ( SocketDataMap::iterator it = m_socketData.begin() ; - it != m_socketData.end() ; ++it ) - { - wxSOCKET_T sock = it->first; - - if ( m_disabledList.find(sock) != m_disabledList.end() ) - { - continue; - } - - int checkAction = it->second.m_pollAction; - - if ( checkAction & PollForRead ) - { - FD_SET(sock, &readFds); - } - - if ( checkAction & PollForWrite ) - { - FD_SET(sock, &writeFds); - } - - FD_SET(sock, &errorFds); - - if ( sock > maxSd ) - { - maxSd = sock; - } - } - - struct timeval timeout; - timeout.tv_sec = 0; // 1s timeout - timeout.tv_usec = 50*1000; - - int selectStatus = select(maxSd+1, &readFds, &writeFds, &errorFds,&timeout); - - if ( selectStatus < 0 ) - { - // Massive error: do something - } - else if ( selectStatus == 0 ) - { - ;// select timed out. There is no need to do anything. - } - else - { - if ( (m_readEnd != SocketPoller::SOCKET_POLLER_INVALID_SOCKET) - && FD_ISSET(m_readEnd, &readFds) ) - { - char c; - recv(m_readEnd, &c, 1, 0); - } - - for ( SocketDataMap::iterator it = m_socketData.begin() ; - it != m_socketData.end() ; ++it ) - { - wxSOCKET_T sock = it->first; - - if ( m_disabledList.find(sock) != m_disabledList.end() ) - { - continue; - } - - int checkActions = it->second.m_pollAction; - wxEvtHandler* hndlr = it->second.m_handler; - int result = InvalidResult; - - if ( checkActions & PollForRead ) - { - if ( FD_ISSET(sock, &readFds) ) - { - result |= ReadyForRead; - } - } - - if ( checkActions & ReadyForWrite ) - { - if ( FD_ISSET(sock, &writeFds) ) - { - result |= ReadyForWrite; - } - } - - if ( FD_ISSET(sock, &errorFds) ) - { - result |= HasError; - } - - if ( result != InvalidResult && hndlr != NULL ) - { - wxThreadEvent* event = new wxThreadEvent(wxSocketAction); - event->SetPayload(sock); - event->SetInt(result); - wxQueueEvent(hndlr,event); - m_disabledList.insert(sock); - } - } - } + wxEventLoopSourceHandler* srcHandler = source->GetHandler(); + delete source; + delete srcHandler; } +SocketPollerImpl* SocketPollerImpl::Create(wxEvtHandler* hndlr) +{ + return new SourceSocketPoller(hndlr); +} +#endif // // wxWebSessionCURL @@ -946,10 +892,11 @@ wxWebSessionCURL::wxWebSessionCURL() : ms_activeSessions++; - m_socketPoller = new SocketPoller(); + m_socketPoller = new SocketPoller(this); m_timeoutTimer.SetOwner(this); Bind(wxEVT_TIMER, &wxWebSessionCURL::TimeoutNotification, this); - Bind(wxSocketAction, &wxWebSessionCURL::ProcessSocketPollerResult, this); + Bind(wxEVT_SOCKET_POLLER_RESULT, + &wxWebSessionCURL::ProcessSocketPollerResult, this); } wxWebSessionCURL::~wxWebSessionCURL() @@ -1139,19 +1086,20 @@ void wxWebSessionCURL::ProcessTimeoutNotification() static int CurlPoll2SocketPoller(int what) { - int pollAction = SocketPoller::InvalidAction; + int pollAction = SocketPoller::INVALID_ACTION; if ( what == CURL_POLL_IN ) { - pollAction = SocketPoller::PollForRead ; + pollAction = SocketPoller::POLL_FOR_READ ; } else if ( what == CURL_POLL_OUT ) { - pollAction = SocketPoller::PollForWrite; + pollAction = SocketPoller::POLL_FOR_WRITE; } else if ( what == CURL_POLL_INOUT ) { - pollAction = SocketPoller::PollForRead | SocketPoller::PollForWrite; + pollAction = + SocketPoller::POLL_FOR_READ | SocketPoller::POLL_FOR_WRITE; } return pollAction; @@ -1169,7 +1117,7 @@ void wxWebSessionCURL::ProcessSocketCallback(curl_socket_t s, int what) case CURL_POLL_OUT: wxFALLTHROUGH; case CURL_POLL_INOUT: - m_socketPoller->StartPolling(s, CurlPoll2SocketPoller(what), this); + m_socketPoller->StartPolling(s, CurlPoll2SocketPoller(what)); break; case CURL_POLL_REMOVE: m_socketPoller->StopPolling(s); @@ -1184,17 +1132,17 @@ static int SocketPollerResult2CurlSelect(int socketEventFlag) { int curlSelect = 0; - if ( socketEventFlag & SocketPoller::ReadyForRead ) + if ( socketEventFlag & SocketPoller::READY_FOR_READ ) { curlSelect |= CURL_CSELECT_IN; } - if ( socketEventFlag & SocketPoller::ReadyForWrite ) + if ( socketEventFlag & SocketPoller::READY_FOR_WRITE ) { curlSelect |= CURL_CSELECT_OUT; } - if ( socketEventFlag & SocketPoller::HasError ) + if ( socketEventFlag & SocketPoller::HAS_ERROR ) { curlSelect |= CURL_CSELECT_ERR; } From 4c8f9ff34dcea73ad497919d8cec74892e2475ff Mon Sep 17 00:00:00 2001 From: New Pagodi Date: Sun, 7 Feb 2021 20:00:38 -0600 Subject: [PATCH 10/12] Cancel a transfer if socket poller operation fails When using the socket poller implementation using wxEventLoopSource objects to monitor sockets, the operation wxEventLoopBase::AddSourceForFD(... can sometimes return NULL. In these cases the socket will not be monitored as needed. The only option seems to be to cancel the transfer and report a failure --- include/wx/private/webrequest_curl.h | 3 +- src/common/webrequest_curl.cpp | 42 ++++++++++++++++++++++++---- 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/include/wx/private/webrequest_curl.h b/include/wx/private/webrequest_curl.h index b106cef287..63659a6a7f 100644 --- a/include/wx/private/webrequest_curl.h +++ b/include/wx/private/webrequest_curl.h @@ -164,9 +164,10 @@ private: void ProcessTimerCallback(long); void TimeoutNotification(wxTimerEvent&); void ProcessTimeoutNotification(); - void ProcessSocketCallback(curl_socket_t, int); + void ProcessSocketCallback(CURL*, curl_socket_t, int); void ProcessSocketPollerResult(wxThreadEvent&); void CheckForCompletedTransfers(); + void FailRequest(CURL*, const wxString&); void StopTransfer(CURL*); WX_DECLARE_HASH_MAP(CURL*, wxWebRequestCURL*, wxPointerHash, \ diff --git a/src/common/webrequest_curl.cpp b/src/common/webrequest_curl.cpp index be444d4f0e..60af20dd17 100644 --- a/src/common/webrequest_curl.cpp +++ b/src/common/webrequest_curl.cpp @@ -1036,11 +1036,11 @@ int wxWebSessionCURL::TimerCallback(CURLM* WXUNUSED(multi), long timeoutms, return 0; } -int wxWebSessionCURL::SocketCallback(CURL* WXUNUSED(easy), curl_socket_t sock, - int what, void* userp, void* WXUNUSED(sp)) +int wxWebSessionCURL::SocketCallback(CURL* curl, curl_socket_t sock, int what, + void* userp, void* WXUNUSED(sp)) { wxWebSessionCURL* session = reinterpret_cast(userp); - session->ProcessSocketCallback(sock, what); + session->ProcessSocketCallback(curl, sock, what); return CURLM_OK; }; @@ -1105,7 +1105,8 @@ static int CurlPoll2SocketPoller(int what) return pollAction; } -void wxWebSessionCURL::ProcessSocketCallback(curl_socket_t s, int what) +void wxWebSessionCURL::ProcessSocketCallback(CURL* curl, curl_socket_t s, + int what) { // Have the socket poller start or stop monitoring a socket depending of // the value of what. @@ -1117,7 +1118,23 @@ void wxWebSessionCURL::ProcessSocketCallback(curl_socket_t s, int what) case CURL_POLL_OUT: wxFALLTHROUGH; case CURL_POLL_INOUT: - m_socketPoller->StartPolling(s, CurlPoll2SocketPoller(what)); + { + int pollAction = CurlPoll2SocketPoller(what); + bool socketIsMonitored = m_socketPoller->StartPolling(s, + pollAction); + + if ( !socketIsMonitored ) + { + TransferSet::iterator it = m_activeTransfers.find(curl); + + if ( it != m_activeTransfers.end() ) + { + FailRequest(curl, + "wxWebSession failed to monitor a socket for this " + "transfer"); + } + } + } break; case CURL_POLL_REMOVE: m_socketPoller->StopPolling(s); @@ -1185,6 +1202,21 @@ void wxWebSessionCURL::CheckForCompletedTransfers() } } +void wxWebSessionCURL::FailRequest(CURL* curl,const wxString& msg) +{ + TransferSet::iterator it = m_activeTransfers.find(curl); + + if ( it != m_activeTransfers.end() ) + { + wxWebRequestCURL* request = it->second; + m_activeTransfers.erase(it); + curl_multi_remove_handle(m_handle, curl); + StopTransfer(curl); + + request->SetState(wxWebRequest::State_Failed, msg); + } +} + void wxWebSessionCURL::StopTransfer(CURL* curl) { curl_socket_t activeSocket; From 71054995610b0b4d48a5e4de1e032d10beeb594a Mon Sep 17 00:00:00 2001 From: New Pagodi Date: Sun, 7 Feb 2021 23:45:33 -0600 Subject: [PATCH 11/12] Make wxWebRequestCURL's progress callbacks static --- src/common/webrequest_curl.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/common/webrequest_curl.cpp b/src/common/webrequest_curl.cpp index 60af20dd17..fdd6bc6bcf 100644 --- a/src/common/webrequest_curl.cpp +++ b/src/common/webrequest_curl.cpp @@ -65,10 +65,10 @@ static size_t wxCURLHeader(char *buffer, size_t size, size_t nitems, void *userd return static_cast(userdata)->CURLOnHeader(buffer, size * nitems); } -int wxCURLXferInfo(void* clientp, curl_off_t dltotal, - curl_off_t WXUNUSED(dlnow), - curl_off_t WXUNUSED(ultotal), - curl_off_t WXUNUSED(ulnow)) +static int wxCURLXferInfo(void* clientp, curl_off_t dltotal, + curl_off_t WXUNUSED(dlnow), + curl_off_t WXUNUSED(ultotal), + curl_off_t WXUNUSED(ulnow)) { wxCHECK_MSG( clientp, 0, "invalid curl progress callback data" ); @@ -76,8 +76,8 @@ int wxCURLXferInfo(void* clientp, curl_off_t dltotal, return response->CURLOnProgress(dltotal); } -int wxCURLProgress(void* clientp, double dltotal, double dlnow, double ultotal, - double ulnow) +static int wxCURLProgress(void* clientp, double dltotal, double dlnow, + double ultotal, double ulnow) { return wxCURLXferInfo(clientp, static_cast(dltotal), static_cast(dlnow), From ab63e7392023216405aa7f74df3e8adb1a4b1759 Mon Sep 17 00:00:00 2001 From: New Pagodi Date: Mon, 8 Feb 2021 12:32:32 -0600 Subject: [PATCH 12/12] Record active sockets in wxWebSessionCURL MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If a wxWebRequestCURL object is canceled or deleted before its transfer is complete, we need to manually close its active socket. Record each transfer’s active socket in wxWebSessionCURL::SocketCallback so can use it if needed. Previously, this was done using curl_easy_getinfo, but this required some compile time and run time checks and could fail in some specific cases. Recording the socket ourselves significantly simplifies the code and should always work. --- include/wx/private/webrequest_curl.h | 7 +- src/common/webrequest_curl.cpp | 109 +++++++++------------------ 2 files changed, 42 insertions(+), 74 deletions(-) diff --git a/include/wx/private/webrequest_curl.h b/include/wx/private/webrequest_curl.h index 63659a6a7f..81b7aecdd7 100644 --- a/include/wx/private/webrequest_curl.h +++ b/include/wx/private/webrequest_curl.h @@ -168,12 +168,17 @@ private: void ProcessSocketPollerResult(wxThreadEvent&); void CheckForCompletedTransfers(); void FailRequest(CURL*, const wxString&); - void StopTransfer(CURL*); + void StopActiveTransfer(CURL*); + void RemoveActiveSocket(CURL*); WX_DECLARE_HASH_MAP(CURL*, wxWebRequestCURL*, wxPointerHash, \ wxPointerEqual, TransferSet); + WX_DECLARE_HASH_MAP(CURL*, curl_socket_t, wxPointerHash, \ + wxPointerEqual, CurlSocketMap); + TransferSet m_activeTransfers; + CurlSocketMap m_activeSockets; SocketPoller* m_socketPoller; wxTimer m_timeoutTimer; diff --git a/src/common/webrequest_curl.cpp b/src/common/webrequest_curl.cpp index fdd6bc6bcf..17dc62b13d 100644 --- a/src/common/webrequest_curl.cpp +++ b/src/common/webrequest_curl.cpp @@ -23,7 +23,7 @@ #endif #include "wx/uri.h" -#include "wx/socket.h" +#include "wx/private/socket.h" #include "wx/evtloop.h" #ifdef __WINDOWS__ @@ -965,35 +965,18 @@ bool wxWebSessionCURL::StartRequest(wxWebRequestCURL & request) void wxWebSessionCURL::CancelRequest(wxWebRequestCURL* request) { + // If this transfer is currently active, stop it. CURL* curl = request->GetHandle(); - TransferSet::iterator it = m_activeTransfers.find(curl); - - if ( it != m_activeTransfers.end() ) - { - m_activeTransfers.erase(it); - } - - curl_multi_remove_handle(m_handle, curl); - StopTransfer(curl); + StopActiveTransfer(curl); request->SetState(wxWebRequest::State_Cancelled); } void wxWebSessionCURL::RequestHasTerminated(wxWebRequestCURL* request) { + // If this transfer is currently active, stop it. CURL* curl = request->GetHandle(); - TransferSet::iterator it = m_activeTransfers.find(curl); - - if ( it != m_activeTransfers.end() ) - { - // The transfer the CURL handle is performing is still in progress, but - // the web request object it belongs to is being deleted. Since the - // next step will call curl_easy_cleanup and any calls on the CURL - // handle after cleanup are illegal, remove it from the CURLM - // multihandle now. - curl_multi_remove_handle(m_handle, curl); - m_activeTransfers.erase(it); - } + StopActiveTransfer(curl); curl_easy_cleanup(curl); } @@ -1119,6 +1102,8 @@ void wxWebSessionCURL::ProcessSocketCallback(CURL* curl, curl_socket_t s, wxFALLTHROUGH; case CURL_POLL_INOUT: { + m_activeSockets[curl] = s; + int pollAction = CurlPoll2SocketPoller(what); bool socketIsMonitored = m_socketPoller->StartPolling(s, pollAction); @@ -1138,6 +1123,7 @@ void wxWebSessionCURL::ProcessSocketCallback(CURL* curl, curl_socket_t s, break; case CURL_POLL_REMOVE: m_socketPoller->StopPolling(s); + RemoveActiveSocket(curl); break; default: wxLogDebug("Unknown socket action in ProcessSocketCallback"); @@ -1197,6 +1183,7 @@ void wxWebSessionCURL::CheckForCompletedTransfers() curl_multi_remove_handle(m_handle, curl); request->HandleCompletion(); m_activeTransfers.erase(it); + RemoveActiveSocket(curl); } } } @@ -1209,74 +1196,50 @@ void wxWebSessionCURL::FailRequest(CURL* curl,const wxString& msg) if ( it != m_activeTransfers.end() ) { wxWebRequestCURL* request = it->second; - m_activeTransfers.erase(it); - curl_multi_remove_handle(m_handle, curl); - StopTransfer(curl); + StopActiveTransfer(curl); request->SetState(wxWebRequest::State_Failed, msg); } } -void wxWebSessionCURL::StopTransfer(CURL* curl) +void wxWebSessionCURL::StopActiveTransfer(CURL* curl) { - curl_socket_t activeSocket; - bool closeActiveSocket = true; - bool useLastSocket = false; + TransferSet::iterator it = m_activeTransfers.find(curl); -#if CURL_AT_LEAST_VERSION(7, 45, 0) - if ( CurlRuntimeAtLeastVersion(7, 45, 0) ) + if ( it != m_activeTransfers.end() ) { - CURLcode code = curl_easy_getinfo(curl, CURLINFO_ACTIVESOCKET, - &activeSocket); + // Record the current active socket now since it should be removed from + // the m_activeSockets map when we call curl_multi_remove_handle. + curl_socket_t activeSocket = CURL_SOCKET_BAD; + CurlSocketMap::iterator it2 = m_activeSockets.find(curl); - if ( code != CURLE_OK || activeSocket == CURL_SOCKET_BAD ) + if ( it2 != m_activeSockets.end() ) { - closeActiveSocket = false; + activeSocket = it2->second; } - } - else - { - useLastSocket = true; - } -#else - useLastSocket = true; -#endif //CURL_AT_LEAST_VERSION(7, 45, 0) - // CURLINFO_ACTIVESOCKET is not available either at compile time or run - // time. So we must use the older CURLINFO_LASTSOCKET instead. - if ( useLastSocket ) - { - #ifdef __WIN64__ - // CURLINFO_LASTSOCKET won't work on 64 bit windows because it - // uses a long to retrive the socket. However sockets will be 64 - // bit values. In this case there is nothing we can do. - closeActiveSocket = false; - #endif //__WIN64__ + // Remove the CURL easy handle from the CURLM multi handle. + curl_multi_remove_handle(m_handle, curl); - if ( closeActiveSocket ) + // If the transfer was active, close its socket. + if ( activeSocket != CURL_SOCKET_BAD ) { - long longSocket; - CURLcode code = curl_easy_getinfo(curl, CURLINFO_LASTSOCKET, - &longSocket); - - if ( code == CURLE_OK && longSocket!= -1 ) - { - activeSocket = static_cast(longSocket); - } - else - { - closeActiveSocket = false; - } + wxCloseSocket(activeSocket); } - } - if ( closeActiveSocket ) + // Clean up the maps. + RemoveActiveSocket(curl); + m_activeTransfers.erase(it); + } +} + +void wxWebSessionCURL::RemoveActiveSocket(CURL* curl) +{ + CurlSocketMap::iterator it = m_activeSockets.find(curl); + + if ( it != m_activeSockets.end() ) { - #ifdef __WINDOWS__ - closesocket(activeSocket); - #else - close(activeSocket); - #endif + m_activeSockets.erase(it); } }