use buffered streams to reduce the number of TCP packets used per IPC command from up to 7 to 1 for reasonably sized payloads, this dramatically (by 150 times for the IPC benchmark on a LAN) increases performance; also centralize all the streams used in a single wxIPCSocketStreams class and allocate only it on the heap instead of doing it for all of the streams

git-svn-id: https://svn.wxwidgets.org/svn/wx/wxWidgets/trunk@56584 c3d73ce0-8a6f-49c7-b76d-6d57e0e08775
This commit is contained in:
Vadim Zeitlin
2008-10-29 16:45:55 +00:00
parent f42c1512dd
commit 8aea37a9df
3 changed files with 340 additions and 205 deletions

View File

@@ -316,6 +316,7 @@ All:
All (Unix): All (Unix):
- Added wx-config --optional-libs command line option (John Labenski). - Added wx-config --optional-libs command line option (John Labenski).
- Noticeably (by a factor of ~150) improve wxIPC classes performance.
All (GUI): All (GUI):

View File

@@ -52,6 +52,8 @@
class WXDLLIMPEXP_FWD_NET wxTCPServer; class WXDLLIMPEXP_FWD_NET wxTCPServer;
class WXDLLIMPEXP_FWD_NET wxTCPClient; class WXDLLIMPEXP_FWD_NET wxTCPClient;
class wxIPCSocketStreams;
class WXDLLIMPEXP_NET wxTCPConnection : public wxConnectionBase class WXDLLIMPEXP_NET wxTCPConnection : public wxConnectionBase
{ {
public: public:
@@ -85,11 +87,19 @@ protected:
wxIPCFormat format); wxIPCFormat format);
wxSocketBase *m_sock; // notice that all the members below are only initialized once the
wxSocketStream *m_sockstrm; // connection is made, i.e. in MakeConnection() for the client objects and
wxDataInputStream *m_codeci; // after OnAcceptConnection() in the server ones
wxDataOutputStream *m_codeco;
wxString m_topic; // the underlying socket (wxSocketClient for IPC client and wxSocketServer
// for IPC server)
wxSocketBase *m_sock;
// various streams that we use
wxIPCSocketStreams *m_streams;
// the topic of this connection
wxString m_topic;
private: private:
// common part of both ctors // common part of both ctors

View File

@@ -6,6 +6,7 @@
// Guillermo Rodriguez (updated for wxSocket v2) Jan 2000 // Guillermo Rodriguez (updated for wxSocket v2) Jan 2000
// (callbacks deprecated) Mar 2000 // (callbacks deprecated) Mar 2000
// Vadim Zeitlin (added support for Unix sockets) Apr 2002 // Vadim Zeitlin (added support for Unix sockets) Apr 2002
// (use buffering, many fixes/cleanup) Oct 2008
// Created: 1993 // Created: 1993
// RCS-ID: $Id$ // RCS-ID: $Id$
// Copyright: (c) Julian Smart 1993 // Copyright: (c) Julian Smart 1993
@@ -49,8 +50,11 @@
// macros and constants // macros and constants
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
namespace
{
// Message codes // Message codes
enum enum IPCCode
{ {
IPC_EXECUTE = 1, IPC_EXECUTE = 1,
IPC_REQUEST, IPC_REQUEST,
@@ -65,8 +69,7 @@ enum
IPC_DISCONNECT IPC_DISCONNECT
}; };
// All sockets will be created with the following flags } // anonymous namespace
#define SCKIPC_FLAGS (wxSOCKET_WAITALL|wxSOCKET_REUSEADDR)
// headers needed for umask() // headers needed for umask()
#ifdef __UNIX_LIKE__ #ifdef __UNIX_LIKE__
@@ -131,6 +134,182 @@ enum
static wxTCPEventHandler *gs_handler = NULL; static wxTCPEventHandler *gs_handler = NULL;
// --------------------------------------------------------------------------
// wxIPCSocketStreams
// --------------------------------------------------------------------------
#define USE_BUFFER
// this class contains the various (related) streams used by wxTCPConnection
// and also provides a way to read from the socket stream directly
//
// for writing to the stream use the IPCOutput class below
class wxIPCSocketStreams
{
public:
// ctor initializes all the streams on top of the given socket
//
// note that we use a bigger than default buffer size which matches the
// typical Ethernet MTU
wxIPCSocketStreams(wxSocketBase& sock)
: m_socketStream(sock),
#ifdef USE_BUFFER
m_bufferedOut(m_socketStream, 1500),
#else
m_bufferedOut(m_socketStream),
#endif
m_dataIn(m_socketStream),
m_dataOut(m_bufferedOut)
{
}
// expose the IO methods needed by IPC code (notice that writing is only
// done via IPCOutput)
// flush output
void Flush()
{
#ifdef USE_BUFFER
m_bufferedOut.Sync();
#endif
}
// simple wrappers around the functions with the same name in
// wxDataInputStream
wxUint8 Read8()
{
Flush();
return m_dataIn.Read8();
}
wxUint32 Read32()
{
Flush();
return m_dataIn.Read32();
}
wxString ReadString()
{
Flush();
return m_dataIn.ReadString();
}
// read arbitrary (size-prepended) data
//
// connection parameter is needed to call its GetBufferAtLeast() method
void *ReadData(wxConnectionBase *conn, size_t *size)
{
Flush();
wxCHECK_MSG( conn, NULL, "NULL connection parameter" );
wxCHECK_MSG( size, NULL, "NULL size parameter" );
*size = Read32();
void * const data = conn->GetBufferAtLeast(*size);
wxCHECK_MSG( data, NULL, "IPC buffer allocation failed" );
m_socketStream.Read(data, *size);
return data;
}
// same as above but for data preceded by the format
void *
ReadFormatData(wxConnectionBase *conn, wxIPCFormat *format, size_t *size)
{
wxCHECK_MSG( format, NULL, "NULL format parameter" );
*format = static_cast<wxIPCFormat>(Read8());
return ReadData(conn, size);
}
// these methods are only used by IPCOutput and not directly
wxDataOutputStream& GetDataOut() { return m_dataOut; }
wxOutputStream& GetUnformattedOut() { return m_bufferedOut; }
private:
// this is the low-level underlying stream using the connection socket
wxSocketStream m_socketStream;
// the buffered stream is used to avoid writing all pieces of an IPC
// request to the socket one by one but to instead do it all at once when
// we're done with it
#ifdef USE_BUFFER
wxBufferedOutputStream m_bufferedOut;
#else
wxOutputStream& m_bufferedOut;
#endif
// finally the data streams are used to be able to write typed data into
// the above streams easily
wxDataInputStream m_dataIn;
wxDataOutputStream m_dataOut;
DECLARE_NO_COPY_CLASS(wxIPCSocketStreams)
};
namespace
{
// an object of this class should be instantiated on the stack to write to the
// underlying socket stream
//
// this class is intentionally separated from wxIPCSocketStreams to ensure that
// Flush() is always called
class IPCOutput
{
public:
// construct an object associated with the given streams (which must have
// life time greater than ours as we keep a reference to it)
IPCOutput(wxIPCSocketStreams *streams)
: m_streams(*streams)
{
wxASSERT_MSG( streams, "NULL streams pointer" );
}
// dtor calls Flush() really sending the IPC data to the network
~IPCOutput() { m_streams.Flush(); }
// write a byte
void Write8(wxUint8 i)
{
m_streams.GetDataOut().Write8(i);
}
// write the reply code and a string
void Write(IPCCode code, const wxString& str)
{
Write8(code);
m_streams.GetDataOut().WriteString(str);
}
// write the reply code, a string and a format in this order
void Write(IPCCode code, const wxString& str, wxIPCFormat format)
{
Write(code, str);
Write8(format);
}
// write arbitrary data
void WriteData(const void *data, size_t size)
{
m_streams.GetDataOut().Write32(size);
m_streams.GetUnformattedOut().Write(data, size);
}
private:
wxIPCSocketStreams& m_streams;
DECLARE_NO_COPY_CLASS(IPCOutput)
};
} // anonymous namespace
// ========================================================================== // ==========================================================================
// implementation // implementation
// ========================================================================== // ==========================================================================
@@ -155,31 +334,26 @@ bool wxTCPClient::ValidHost(const wxString& host)
return addr.Hostname(host); return addr.Hostname(host);
} }
wxConnectionBase *wxTCPClient::MakeConnection (const wxString& host, wxConnectionBase *wxTCPClient::MakeConnection(const wxString& host,
const wxString& serverName, const wxString& serverName,
const wxString& topic) const wxString& topic)
{ {
wxSockAddress *addr = GetAddressFromName(serverName, host); wxSockAddress *addr = GetAddressFromName(serverName, host);
if ( !addr ) if ( !addr )
return NULL; return NULL;
wxSocketClient *client = new wxSocketClient(SCKIPC_FLAGS); wxSocketClient * const client = new wxSocketClient(wxSOCKET_WAITALL);
wxSocketStream *stream = new wxSocketStream(*client); wxIPCSocketStreams * const streams = new wxIPCSocketStreams(*client);
wxDataInputStream *data_is = new wxDataInputStream(*stream);
wxDataOutputStream *data_os = new wxDataOutputStream(*stream);
bool ok = client->Connect(*addr); bool ok = client->Connect(*addr);
delete addr; delete addr;
if ( ok ) if ( ok )
{ {
unsigned char msg;
// Send topic name, and enquire whether this has succeeded // Send topic name, and enquire whether this has succeeded
data_os->Write8(IPC_CONNECT); IPCOutput(streams).Write(IPC_CONNECT, topic);
data_os->WriteString(topic);
msg = data_is->Read8(); unsigned char msg = streams->Read8();
// OK! Confirmation. // OK! Confirmation.
if (msg == IPC_CONNECT) if (msg == IPC_CONNECT)
@@ -193,9 +367,7 @@ wxConnectionBase *wxTCPClient::MakeConnection (const wxString& host,
{ {
connection->m_topic = topic; connection->m_topic = topic;
connection->m_sock = client; connection->m_sock = client;
connection->m_sockstrm = stream; connection->m_streams = streams;
connection->m_codeci = data_is;
connection->m_codeco = data_os;
client->SetEventHandler(*gs_handler, _CLIENT_ONREQUEST_ID); client->SetEventHandler(*gs_handler, _CLIENT_ONREQUEST_ID);
client->SetClientData(connection); client->SetClientData(connection);
client->SetNotify(wxSOCKET_INPUT_FLAG | wxSOCKET_LOST_FLAG); client->SetNotify(wxSOCKET_INPUT_FLAG | wxSOCKET_LOST_FLAG);
@@ -212,9 +384,7 @@ wxConnectionBase *wxTCPClient::MakeConnection (const wxString& host,
} }
// Something went wrong, delete everything // Something went wrong, delete everything
delete data_is; delete streams;
delete data_os;
delete stream;
client->Destroy(); client->Destroy();
return NULL; return NULL;
@@ -273,8 +443,10 @@ bool wxTCPServer::Create(const wxString& serverName)
} }
#endif // __UNIX_LIKE__ #endif // __UNIX_LIKE__
// Create a socket listening on the specified port // Create a socket listening on the specified port (reusing it to allow
m_server = new wxSocketServer(*addr, SCKIPC_FLAGS); // restarting the server listening on the same port as was used by the
// previous instance of this server)
m_server = new wxSocketServer(*addr, wxSOCKET_WAITALL | wxSOCKET_REUSEADDR);
#ifdef __UNIX_LIKE__ #ifdef __UNIX_LIKE__
if ( addr->Type() == wxSockAddress::UNIX ) if ( addr->Type() == wxSockAddress::UNIX )
@@ -336,10 +508,8 @@ wxTCPServer::OnAcceptConnection(const wxString& WXUNUSED(topic))
void wxTCPConnection::Init() void wxTCPConnection::Init()
{ {
m_sock = NULL; m_sock = NULL;
m_sockstrm = NULL; m_streams = NULL;
m_codeci = NULL;
m_codeco = NULL;
} }
wxTCPConnection::~wxTCPConnection() wxTCPConnection::~wxTCPConnection()
@@ -352,10 +522,7 @@ wxTCPConnection::~wxTCPConnection()
m_sock->Destroy(); m_sock->Destroy();
} }
/* Delete after destroy */ delete m_streams;
wxDELETE(m_codeci);
wxDELETE(m_codeco);
wxDELETE(m_sockstrm);
} }
void wxTCPConnection::Compress(bool WXUNUSED(on)) void wxTCPConnection::Compress(bool WXUNUSED(on))
@@ -370,7 +537,7 @@ bool wxTCPConnection::Disconnect()
return true; return true;
// Send the disconnect message to the peer. // Send the disconnect message to the peer.
m_codeco->Write8(IPC_DISCONNECT); IPCOutput(m_streams).Write8(IPC_DISCONNECT);
if ( m_sock ) if ( m_sock )
{ {
@@ -391,11 +558,11 @@ bool wxTCPConnection::DoExecute(const void *data,
return false; return false;
// Prepare EXECUTE message // Prepare EXECUTE message
m_codeco->Write8(IPC_EXECUTE); IPCOutput out(m_streams);
m_codeco->Write8(format); out.Write8(IPC_EXECUTE);
out.Write8(format);
m_codeco->Write32(size); out.WriteData(data, size);
m_sockstrm->Write(data, size);
return true; return true;
} }
@@ -407,24 +574,13 @@ const void *wxTCPConnection::Request(const wxString& item,
if ( !m_sock->IsConnected() ) if ( !m_sock->IsConnected() )
return NULL; return NULL;
m_codeco->Write8(IPC_REQUEST); IPCOutput(m_streams).Write(IPC_REQUEST, item, format);
m_codeco->WriteString(item);
m_codeco->Write8(format);
int ret = m_codeci->Read8(); int ret = m_streams->Read8();
if ( ret == IPC_FAIL ) if ( ret == IPC_FAIL )
return NULL; return NULL;
size_t s = m_codeci->Read32(); return m_streams->ReadData(this, size);
void *data = GetBufferAtLeast( s );
wxASSERT_MSG(data != NULL,
_T("Buffer too small in wxTCPConnection::Request") );
m_sockstrm->Read(data, s);
if (size)
*size = s;
return data;
} }
bool wxTCPConnection::DoPoke(const wxString& item, bool wxTCPConnection::DoPoke(const wxString& item,
@@ -435,25 +591,21 @@ bool wxTCPConnection::DoPoke(const wxString& item,
if ( !m_sock->IsConnected() ) if ( !m_sock->IsConnected() )
return false; return false;
m_codeco->Write8(IPC_POKE); IPCOutput out(m_streams);
m_codeco->WriteString(item); out.Write(IPC_POKE, item, format);
m_codeco->Write8(format); out.WriteData(data, size);
m_codeco->Write32(size);
m_sockstrm->Write(data, size);
return true; return true;
} }
bool wxTCPConnection::StartAdvise (const wxString& item) bool wxTCPConnection::StartAdvise(const wxString& item)
{ {
if ( !m_sock->IsConnected() ) if ( !m_sock->IsConnected() )
return false; return false;
m_codeco->Write8(IPC_ADVISE_START); IPCOutput(m_streams).Write(IPC_ADVISE_START, item);
m_codeco->WriteString(item);
int ret = m_codeci->Read8(); int ret = m_streams->Read8();
if (ret != IPC_FAIL) if (ret != IPC_FAIL)
return true; return true;
else else
@@ -465,10 +617,9 @@ bool wxTCPConnection::StopAdvise (const wxString& item)
if ( !m_sock->IsConnected() ) if ( !m_sock->IsConnected() )
return false; return false;
m_codeco->Write8(IPC_ADVISE_STOP); IPCOutput(m_streams).Write(IPC_ADVISE_STOP, item);
m_codeco->WriteString(item);
int ret = m_codeci->Read8(); int ret = m_streams->Read8();
if (ret != IPC_FAIL) if (ret != IPC_FAIL)
return true; return true;
@@ -485,12 +636,9 @@ bool wxTCPConnection::DoAdvise(const wxString& item,
if ( !m_sock->IsConnected() ) if ( !m_sock->IsConnected() )
return false; return false;
m_codeco->Write8(IPC_ADVISE); IPCOutput out(m_streams);
m_codeco->WriteString(item); out.Write(IPC_ADVISE, item, format);
m_codeco->Write8(format); out.WriteData(data, size);
m_codeco->Write32(size);
m_sockstrm->Write(data, size);
return true; return true;
} }
@@ -517,12 +665,6 @@ void wxTCPEventHandler::Client_OnRequest(wxSocketEvent &event)
if (!connection) if (!connection)
return; return;
wxDataInputStream *codeci;
wxDataOutputStream *codeco;
wxSocketStream *sockstrm;
wxString topic_name = connection->m_topic;
wxString item;
// We lost the connection: destroy everything // We lost the connection: destroy everything
if (evt == wxSOCKET_LOST) if (evt == wxSOCKET_LOST)
{ {
@@ -533,134 +675,120 @@ void wxTCPEventHandler::Client_OnRequest(wxSocketEvent &event)
} }
// Receive message number. // Receive message number.
codeci = connection->m_codeci; wxIPCSocketStreams * const streams = connection->m_streams;
codeco = connection->m_codeco;
sockstrm = connection->m_sockstrm;
int msg = codeci->Read8();
switch (msg) const wxString topic = connection->m_topic;
wxString item;
switch ( const int msg = streams->Read8() )
{ {
case IPC_EXECUTE: case IPC_EXECUTE:
{ {
void *data;
size_t size;
wxIPCFormat format; wxIPCFormat format;
size_t size;
void * const
data = streams->ReadFormatData(connection, &format, &size);
format = (wxIPCFormat)codeci->Read8(); connection->OnExecute(topic, data, size, format);
size = codeci->Read32();
data = connection->GetBufferAtLeast( size );
wxASSERT_MSG(data != NULL,
"Buffer too small in wxTCPEventHandler::Client_OnRequest" );
sockstrm->Read(data, size);
connection->OnExecute (topic_name, data, size, format);
break;
} }
break;
case IPC_ADVISE: case IPC_ADVISE:
{ {
item = codeci->ReadString(); item = streams->ReadString();
wxIPCFormat format = (wxIPCFormat)codeci->Read8();
size_t size = codeci->Read32();
void *data = connection->GetBufferAtLeast( size );
wxASSERT_MSG(data != NULL,
"Buffer too small in wxTCPEventHandler::Client_OnRequest" );
sockstrm->Read(data, size);
connection->OnAdvise (topic_name, item, data, size, format); wxIPCFormat format;
size_t size;
void * const
data = streams->ReadFormatData(connection, &format, &size);
break; connection->OnAdvise(topic, item, data, size, format);
} }
break;
case IPC_ADVISE_START: case IPC_ADVISE_START:
{ {
item = codeci->ReadString(); item = streams->ReadString();
bool ok = connection->OnStartAdvise (topic_name, item); IPCOutput(streams).Write8(connection->OnStartAdvise(topic, item)
if (ok) ? IPC_ADVISE_START
codeco->Write8(IPC_ADVISE_START); : IPC_FAIL);
else
codeco->Write8(IPC_FAIL);
break;
} }
break;
case IPC_ADVISE_STOP: case IPC_ADVISE_STOP:
{ {
item = codeci->ReadString(); item = streams->ReadString();
bool ok = connection->OnStopAdvise (topic_name, item); IPCOutput(streams).Write8(connection->OnStopAdvise(topic, item)
if (ok) ? IPC_ADVISE_STOP
codeco->Write8(IPC_ADVISE_STOP); : IPC_FAIL);
else
codeco->Write8(IPC_FAIL);
break;
} }
break;
case IPC_POKE: case IPC_POKE:
{ {
item = codeci->ReadString(); item = streams->ReadString();
wxIPCFormat format = (wxIPCFormat)codeci->Read8(); wxIPCFormat format = (wxIPCFormat)streams->Read8();
size_t size = codeci->Read32();
void *data = connection->GetBufferAtLeast( size );
wxASSERT_MSG(data != NULL,
"Buffer too small in wxTCPEventHandler::Client_OnRequest" );
sockstrm->Read(data, size);
connection->OnPoke (topic_name, item, data, size, format); size_t size;
void * const data = streams->ReadData(connection, &size);
break; connection->OnPoke(topic, item, data, size, format);
} }
break;
case IPC_REQUEST: case IPC_REQUEST:
{ {
wxIPCFormat format; item = streams->ReadString();
item = codeci->ReadString(); wxIPCFormat format = (wxIPCFormat)streams->Read8();
format = (wxIPCFormat)codeci->Read8();
size_t user_size = wxNO_LEN; size_t user_size = wxNO_LEN;
const void *user_data = connection->OnRequest(topic_name, const void *user_data = connection->OnRequest(topic,
item, item,
&user_size, &user_size,
format); format);
if (user_data) if ( !user_data )
{ {
codeco->Write8(IPC_REQUEST_REPLY); IPCOutput(streams).Write8(IPC_FAIL);
break;
if (user_size == wxNO_LEN)
{
switch (format)
{
case wxIPC_TEXT:
case wxIPC_UTF8TEXT:
user_size = strlen((const char *)user_data) + 1; // includes final NUL
break;
case wxIPC_UNICODETEXT:
user_size = (wcslen((const wchar_t *)user_data) + 1) * sizeof(wchar_t); // includes final NUL
break;
default:
user_size = 0;
}
}
codeco->Write32(user_size);
sockstrm->Write(user_data, user_size);
} }
else
codeco->Write8(IPC_FAIL);
break; IPCOutput out(streams);
out.Write8(IPC_REQUEST_REPLY);
if ( user_size == wxNO_LEN )
{
switch ( format )
{
case wxIPC_TEXT:
case wxIPC_UTF8TEXT:
user_size = strlen((const char *)user_data) + 1; // includes final NUL
break;
case wxIPC_UNICODETEXT:
user_size = (wcslen((const wchar_t *)user_data) + 1) * sizeof(wchar_t); // includes final NUL
break;
default:
user_size = 0;
}
}
out.WriteData(user_data, user_size);
} }
break;
case IPC_DISCONNECT: case IPC_DISCONNECT:
{ sock->Notify(false);
sock->Notify(false); sock->Close();
sock->Close(); connection->SetConnected(false);
connection->SetConnected(false); connection->OnDisconnect();
connection->OnDisconnect(); break;
break;
}
default: default:
codeco->Write8(IPC_FAIL); wxLogDebug("Unknown message code %d received.", msg);
IPCOutput(streams).Write8(IPC_FAIL);
break; break;
} }
} }
@@ -689,52 +817,48 @@ void wxTCPEventHandler::Server_OnRequest(wxSocketEvent &event)
return; return;
} }
wxSocketStream *stream = new wxSocketStream(*sock); wxIPCSocketStreams *streams = new wxIPCSocketStreams(*sock);
wxDataInputStream *codeci = new wxDataInputStream(*stream);
wxDataOutputStream *codeco = new wxDataOutputStream(*stream);
int msg;
msg = codeci->Read8();
if (msg == IPC_CONNECT)
{ {
wxString topic_name; IPCOutput out(streams);
topic_name = codeci->ReadString();
wxTCPConnection *new_connection = const int msg = streams->Read8();
(wxTCPConnection *)ipcserv->OnAcceptConnection (topic_name); if ( msg == IPC_CONNECT )
if (new_connection)
{ {
if (new_connection->IsKindOf(CLASSINFO(wxTCPConnection))) const wxString topic = streams->ReadString();
wxTCPConnection *new_connection =
(wxTCPConnection *)ipcserv->OnAcceptConnection (topic);
if (new_connection)
{ {
// Acknowledge success if (new_connection->IsKindOf(CLASSINFO(wxTCPConnection)))
codeco->Write8(IPC_CONNECT); {
new_connection->m_topic = topic_name; // Acknowledge success
new_connection->m_sock = sock; out.Write8(IPC_CONNECT);
new_connection->m_sockstrm = stream;
new_connection->m_codeci = codeci; new_connection->m_sock = sock;
new_connection->m_codeco = codeco; new_connection->m_streams = streams;
sock->SetEventHandler(*gs_handler, _CLIENT_ONREQUEST_ID); new_connection->m_topic = topic;
sock->SetClientData(new_connection); sock->SetEventHandler(*gs_handler, _CLIENT_ONREQUEST_ID);
sock->SetNotify(wxSOCKET_INPUT_FLAG | wxSOCKET_LOST_FLAG); sock->SetClientData(new_connection);
sock->Notify(true); sock->SetNotify(wxSOCKET_INPUT_FLAG | wxSOCKET_LOST_FLAG);
return; sock->Notify(true);
} return;
else }
{ else
delete new_connection; {
// and fall through to delete everything else delete new_connection;
// and fall through to delete everything else
}
} }
} }
}
// Something went wrong, send failure message and delete everything // Something went wrong, send failure message and delete everything
codeco->Write8(IPC_FAIL); out.Write8(IPC_FAIL);
} // IPCOutput object is destroyed here, before destroying stream
delete codeco; delete streams;
delete codeci;
delete stream;
sock->Destroy(); sock->Destroy();
} }